"""This module has MAXFile class from which is the base class for all MAX file type classes"""
import os
import errno
import shutil
from typing import Any, Optional
import dask.dataframe as dd
import numpy as np
import pandas as pd
from pyarrow.lib import ArrowInvalid, ArrowTypeError
from medicaid_utils.common_utils import dataframe_utils, links
[docs]
class MAXFile:
"""Parent class for all MAX file classes, each of which will have clean
and preprocess functions"""
def __init__(
self,
ftype: str,
year: int,
state: str,
data_root: str,
index_col: str = "BENE_MSIS",
clean: bool = True,
preprocess: bool = True,
tmp_folder: Optional[str] = None,
pq_engine: str = "pyarrow",
) -> None:
"""
Initializes MAX file object by preloading and preprocessing(if opted
in) the file
Parameters
----------
ftype : {'ip', 'ot', 'rx', 'ps', 'cc'}
MAX Claim type.
year : int
Year of claim file
state : str
State of claim file
data_root : str
Root folder of raw claim files
index_col : str, default='BENE_MSIS'
Index column name. Eg. BENE_MSIS or MSIS_ID. The raw file is
expected to be already
sorted with index column
clean : bool, default=True
Should the associated files be cleaned?
preprocess : bool, default=True
Should the associated files be preprocessed?
tmp_folder : str, default=None
Folder location to use for caching intermediate results. Can be
turned off by not passing this argument.
pq_engine : str, default='pyarrow'
Parquet engine to use
Raises
------
FileNotFoundError
Raised when raw claim files are missing
"""
self.data_root = data_root
self.fileloc = links.get_max_parquet_loc(data_root, ftype, state, year)
self.ftype = ftype
self.index_col = index_col
self.year = year
self.state = state
self.tmp_folder = tmp_folder
if not os.path.exists(self.fileloc):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), self.fileloc
)
self.pq_engine = pq_engine
self.df = dd.read_parquet(
self.fileloc, index=False, engine=self.pq_engine
)
self.df = self.df.assign(
HAS_BENE=(self.df["BENE_ID"].fillna("").str.len() > 0).astype(int)
)
sorted_index = True
if "BENE_MSIS" not in self.df.columns:
self.index_col = self.df.index.name
self.cache_results()
sorted_index = False
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
BENE_MSIS=pdf["STATE_CD"]
+ "-"
+ pdf["HAS_BENE"].astype(str)
+ "-"
+ pdf["BENE_ID"]
.where(pdf["BENE_ID"].fillna("").str.len() > 0, np.nan)
.fillna(pdf["MSIS_ID"])
)
)
self.cache_results()
self.index_col = index_col
self.df = self.df.set_index(self.index_col, sorted=sorted_index)
if (not sorted_index) and (self.state in ["FL", "NY", "TX", "CA"]):
self.cache_results()
self.lst_raw_col = list(self.df.columns)
# This dictionary variable can be used to filter out data that will
# not met minimum quality expections
self.dct_default_filters = {}
if clean:
self.clean()
if preprocess:
self.preprocess()
[docs]
@classmethod
def get_claim_instance(
cls, claim_type: str, *args: Any, **kwargs: Any
) -> "MAXFile":
"""
Returns an instance of the requested claim type
Parameters
----------
claim_type : {'ip', 'ot', 'cc', 'rx'}
Claim type
*args : list
List of position arguments
**kwargs : dict
Dictionary of keyword arguments
"""
return next(
claim
for claim in cls.__subclasses__()
if claim.__name__ == f"MAX{claim_type.upper()}"
)(*args, **kwargs)
[docs]
def cache_results(
self, repartition: bool = False
) -> None:
"""
Save results in intermediate steps of some lengthy processing. Saving intermediate results speeds up
processing
Parameters
----------
repartition : bool, default=False
Repartition the dask dataframe
"""
if self.tmp_folder is not None:
# if repartition:
# self.df = self.df.repartition(
# partition_size="100MB"
# ) # Patch, currently to_parquet results
# # in error when any of the partitions is empty
# if not self.df.known_divisions:
# self.df = self.df.reset_index().set_index(self.index_col)
self.pq_export(self.tmp_folder, repartition)
[docs]
def pq_export(self, dest_path_and_fname: str, repartition: bool = False) -> dd.DataFrame:
"""
Export parquet files (overwrite safe)
Parameters
----------
dest_path_and_fname : str
Destination path
repartition : bool, default=False
Repartition the dask dataframe
"""
shutil.rmtree(dest_path_and_fname + "_tmp", ignore_errors=True)
os.makedirs(os.path.dirname(dest_path_and_fname), exist_ok=True)
if repartition:
self.df = self.df.repartition(partition_size="100MB", force=True)
if not self.df.known_divisions:
self.df = self.df.assign(
**{f"new_{self.index_col}": self.df.index}
).set_index(f"new_{self.index_col}", sorted=True)
self.df.index = self.df.index.rename(self.index_col)
try:
self.df.to_parquet(
dest_path_and_fname + "_tmp",
engine=self.pq_engine,
write_index=True,
)
except (ArrowInvalid, ArrowTypeError):
obj_cols = [
c for c in self.df.columns
if self.df[c].dtype == "object"
]
if obj_cols:
self.df = self.df.assign(
**{c: self.df[c].astype(str) for c in obj_cols}
)
idx_name = self.df.index.name
if idx_name is not None:
self.df = self.df.reset_index()
if self.df[idx_name].dtype == "object":
self.df[idx_name] = self.df[idx_name].astype(str)
self.df = self.df.set_index(idx_name)
self.df.to_parquet(
dest_path_and_fname + "_tmp",
engine=self.pq_engine,
write_index=True,
)
del self.df
shutil.rmtree(dest_path_and_fname, ignore_errors=True)
os.rename(dest_path_and_fname + "_tmp", dest_path_and_fname)
self.df = dd.read_parquet(
dest_path_and_fname, index=False, engine=self.pq_engine
).set_index(self.index_col, sorted=True)
return self.df
[docs]
def clean(self) -> None:
"""Cleaning routines to processes date and gender columns"""
# Date columns will be cleaned and all commonly used date based
# variables are constructed
# in this step
self.process_date_cols()
self.add_gender()
[docs]
def preprocess(self) -> None:
"""Add basic constructed variables"""
[docs]
def export(
self, dest_folder: str, output_format: str = "csv", repartition: bool = False
) -> None:
"""
Exports the files.
Parameters
----------
dest_folder : str
Destination folder
output_format : str, default='csv'
Export format ('csv' or 'parquet')
repartition : bool, default=False
Repartition the dask dataframe
"""
if output_format == "csv":
self.df.to_csv(
os.path.join(
dest_folder, f"{self.ftype}_{self.year}_{self.state}.csv"
),
index=(self.df.index.name == self.index_col),
single_file=True,
)
else:
self.pq_export(
os.path.join(
dest_folder,
self.fileloc.split(
self.data_root
+ (
os.path.sep
if not self.data_root.endswith(os.path.sep)
else ""
)
)[1],
),
repartition=repartition,
)
[docs]
def add_gender(self) -> None:
"""Adds integer 'female' column based on 'EL_SEX_CD' column. Undefined values ('U') in EL_SEX_CD column will
result in female column taking the value -1"""
if "EL_SEX_CD" in self.df.columns:
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
female=np.select(
[
pdf["EL_SEX_CD"].str.strip() == "F",
pdf["EL_SEX_CD"].str.strip() == "M",
],
[1, 0],
default=-1,
).astype(int)
)
)
[docs]
def clean_diag_codes(self) -> None:
"""Clean diagnostic code columns by removing non-alphanumeric characters and converting them to upper case"""
if (
len([col for col in self.df.columns if col.startswith("DIAG_CD_")])
> 0
):
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col]
.str.replace("[^a-zA-Z0-9]+", "", regex=True)
.str.upper()
for col in pdf.columns
if col.startswith("DIAG_CD_")
}
)
)
[docs]
def clean_proc_codes(self) -> None:
"""Clean diagnostic code columns by removing non-alphanumeric characters and converting them to upper case"""
if (
len(
[
col
for col in self.df.columns
if col.startswith("PRCDR_CD")
and (not col.startswith("PRCDR_CD_SYS"))
]
)
> 0
):
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col]
.str.replace("[^a-zA-Z0-9]+", "", regex=True)
.str.upper()
for col in pdf.columns
if col.startswith("PRCDR_CD")
and (not col.startswith("PRCDR_CD_SYS"))
}
)
)
[docs]
def process_date_cols(self) -> None:
"""
Convert datetime columns to datetime type and add basic date based constructed variables
New columns:
- birth_year, birth_month, birth_day - date compoments of EL_DOB (date of birth)
- birth_date - birth date (EL_DOB)
- death - 0 or 1, if EL_DOD or MDCR_DOD is not empty and falls in the claim year or before
- age - age in years, integer format
- age_day - age in days
- adult - 0 or 1, 1 when patient's age is in [18,115]
- child - 0 or 1, 1 when patient's age is in [0,17]
if ftype == 'ip':
Clean/ impute admsn_date and add ip duration related columns
New column(s):
- admsn_date - Admission date (ADMSN_DT)
- srvc_bgn_date - Service begin date (SRVC_BGN_DT)
- srvc_end_date - Service end date (SRVC_END_DT)
- prncpl_proc_date - Principal procedure date (PRNCPL_PRCDR_DT)
- missing_admsn_date - 0 or 1, 1 when admission date is missing
- missing_prncpl_proc_date - 0 or 1, 1 when principal procedure date is missing
- flag_admsn_miss - 0 or 1, 1 when admsn_date was imputed
- los - ip service duration in days
- ageday_admsn - age in days as on admsn_date
- age_admsn - age in years, with decimals, as on admsn_date
- age_prncpl_proc - age in years as on principal procedure date
- age_day_prncpl_proc - age in days as on principal procedure date
if ftype == 'ot':
Adds duration column, provided service end and begin dates are clean
New Column(s):
- srvc_bgn_date - Service begin date (SRVC_BGN_DT)
- srvc_end_date - Service end date (SRVC_END_DT)
- diff & duration - duration of service in days
- age_day_srvc_bgn - age in days as on service begin date
- age_srvc_bgn - age in years, with decimals, as on service begin date
if ftype == 'ps:
New Column(s):
- date_of_death - Date of death (EL_DOD)
- medicare_date_of_death - Medicare date of death (MDCR_DOD)
"""
if self.ftype in ["ip", "lt", "ot", "ps", "rx"]:
df = self.df.assign(
**{
"year": self.df[col].astype(int)
for col in ["MAX_YR_DT", "YR_NUM"]
if col in self.df.columns
}
)
dct_date_col = {
"EL_DOB": "birth_date",
"ADMSN_DT": "admsn_date",
"SRVC_BGN_DT": "srvc_bgn_date",
"SRVC_END_DT": "srvc_end_date",
"EL_DOD": "date_of_death",
"MDCR_DOD": "medicare_date_of_death",
"PRNCPL_PRCDR_DT": "prncpl_proc_date",
}
dct_date_col = {
col: new_col_name
for col, new_col_name in dct_date_col.items()
if col in df.columns
}
df = df.assign(
**{new_name: df[col] for col, new_name in dct_date_col.items()}
)
# converting lst_col columns to datetime type
lst_col_to_convert = [
new_name
for col, new_name in dct_date_col.items()
if (
df[[dct_date_col[col]]]
.select_dtypes(include=[np.datetime64])
.shape[1]
== 0
)
]
df = dataframe_utils.convert_ddcols_to_datetime(
df, lst_col_to_convert
)
df = df.assign(
birth_year=df.birth_date.dt.year,
birth_month=df.birth_date.dt.month,
birth_day=df.birth_date.dt.day,
)
df = df.assign(age=df.year - df.birth_year)
df = df.assign(
age=df["age"].where(
df["age"].between(0, 115, inclusive="both"), np.nan
)
)
df = df.assign(
age_day=(
dd.to_datetime(
df.year.astype(str) + "1231", format="%Y%m%d"
)
- df.birth_date
).dt.days
)
df = df.assign(
adult=df["age"]
.between(18, 115, inclusive="both")
.astype(pd.Int64Dtype()),
child=df["age"]
.between(0, 17, inclusive="both")
.astype(pd.Int64Dtype()),
)
df = df.assign(
adult=df["adult"].where(~(df["age"].isna()), -1).astype(int),
child=df["child"].where(~(df["age"].isna()), -1).astype(int),
)
if self.ftype != "ps":
df = df.map_partitions(
lambda pdf: pdf.assign(
adult=pdf.groupby(pdf.index)["adult"].transform("max"),
age=pdf.groupby(pdf.index)["age"].transform("max"),
age_day=pdf.groupby(pdf.index)["age_day"].transform(
"max"
),
)
)
if "date_of_death" in df.columns:
df = df.assign(
death=(
(
df.date_of_death.dt.year.fillna(
df.year + 10
).astype(int)
<= df.year
)
| (
df.medicare_date_of_death.dt.year.fillna(
df.year + 10
).astype(int)
<= df.year
)
).astype(int)
)
if self.ftype == "ip":
df = df.assign(
missing_admsn_date=df["admsn_date"].isnull().astype(int),
missing_prncpl_proc_date=df["prncpl_proc_date"]
.isnull()
.astype(int),
)
df = df.assign(
admsn_date=df["admsn_date"].where(
~df["admsn_date"].isnull(), df["srvc_bgn_date"]
)
)
df = df.assign(
los=(df["srvc_end_date"] - df["admsn_date"]).dt.days + 1,
)
df = df.assign(
los=df["los"].where(
(
(df["year"] >= df["admsn_date"].dt.year)
& (df["admsn_date"] <= df["srvc_end_date"])
),
np.nan,
)
)
df = df.assign(
prncpl_proc_date=df["prncpl_proc_date"].where(
~df["prncpl_proc_date"].isnull(), df["admsn_date"]
)
)
df = df.assign(
age_day_admsn=(
df["admsn_date"] - df["birth_date"]
).dt.days,
age_day_prncpl_proc=(
df["prncpl_proc_date"] - df["birth_date"]
).dt.days,
)
df = df.assign(
age_admsn=(df["age_day_admsn"].fillna(0) / 365.25).astype(
int
),
age_prncpl_proc=(
df["age_day_prncpl_proc"].fillna(0) / 365.25
).astype(int),
)
if self.ftype == "ot":
df = df.assign(
duration=(
df["srvc_end_date"] - df["srvc_bgn_date"]
).dt.days,
age_day_srvc_bgn=(
df["srvc_bgn_date"] - df["birth_date"]
).dt.days,
)
df = df.assign(
duration=df["duration"].where(
(df["srvc_bgn_date"] <= df["srvc_end_date"]), np.nan
),
age_srvc_bgn=(
df["age_day_srvc_bgn"].fillna(0) / 365.25
).astype(int),
)
self.df = df
[docs]
def calculate_payment(self) -> None:
"""
Calculates payment amount
New Column(s):
pymt_amt - "MDCD_PYMT_AMT" + "TP_PYMT_AMT"
"""
# cost
# MDCD_PYMT_AMT=TOTAL AMOUNT OF MONEY PAID BY MEDICAID FOR THIS SERVICE
# TP_PYMT_AMT=TOTAL AMOUNT OF MONEY PAID BY A THIRD PARTY
# CHRG_AMT: we never use charge amount for cost analysis
if self.ftype in ["ot", "rx", "ip"]:
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
pymt_amt=pdf[["MDCD_PYMT_AMT", "TP_PYMT_AMT"]]
.apply(pd.to_numeric, errors="coerce")
.sum(axis=1)
)
)
[docs]
def flag_ed_use(self) -> None:
"""
Detects ed use in claims
New Column(s):
- ed_cpt - 0 or 1, Claim has a procedure billed in ED code range (99281–99285)
(PRCDR_CD_SYS_{1-6} == 01 & PRCDR_CD_{1-6} in (99281–99285))
- ed_ub92 - 0 or 1, Claim has a revenue center codes (0450 - 0459, 0981) - UB_92_REV_CD_GP_{1-23}
- ed_tos - 0 or 1, Claim has an outpatient type of service (MAX_TOS = 11) (if ftype == 'ip')
- ed_pos - 0 or 1, Claim has PLC_OF_SRVC_CD set to 23 (if ftype == 'ot')
- ed_use - any of ed_cpt, ed_ub92, ed_tos or ed_pos is 1
- any_ed - 0 or 1, 1 when any other claim from the same visit has ed_use set to 1 (if ftype == 'ot')
Uses the below as reference:
- If the patient is a Medicare beneficiary, the general surgeon should bill the level of
ED code (99281-99285) (https://web.archive.org/web/20231125185256/https://bulletin.facs.org/2013/02/coding-for-hospital-admission/)
- Inpatient files: Revenue Center Codes 0450-0459, 0981 (https://web.archive.org/web/20210303085851/https://www.resdac.org/resconnect/articles/144)
Returns
-------
"""
# reference: If the patient is a Medicare beneficiary, the general surgeon should bill the level of
# ED code (99281-99285). https://web.archive.org/web/20231125185256/https://bulletin.facs.org/2013/02/coding-for-hospital-admission/
if self.ftype in ["ot", "ip"]:
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
"ed_cpt": np.column_stack(
[
pdf[col].str.startswith(
(
"99281",
"99282",
"99283",
"99284",
"99285",
),
na=False,
)
for col in pdf.columns
if col.startswith(("PRCDR_CD",))
and (not col.startswith(("PRCDR_CD_SYS",)))
]
)
.any(axis=1)
.astype(int)
}
)
)
if self.ftype == "ip":
# Inpatient files: Revenue Center Codes 0450-0459, 0981,
# https://web.archive.org/web/20210303085851/https://www.resdac.org/resconnect/articles/144
# TOS - Type of Service
# 11=outpatient hospital ???? not every IP which called outpatient hospital is called ED,
# this may end up with too many ED
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
"ed_ub92": np.column_stack(
[
pd.to_numeric(
pdf[col], errors="coerce"
).isin(
[
450,
451,
452,
453,
454,
455,
456,
457,
458,
459,
981,
]
)
for col in pdf.columns
if col.startswith("UB_92_REV_CD_GP_")
]
)
.any(axis=1)
.astype(int),
"ed_tos": (
pd.to_numeric(pdf["MAX_TOS"], errors="coerce")
== 11
).astype(int),
}
)
)
self.df = self.df.assign(
ed_use=self.df[["ed_ub92", "ed_cpt", "ed_tos"]]
.any(axis="columns")
.astype(int)
)
else:
# UB92: # ??????? 450,451,452,453,454,455,456,457,458,459,981 ????????
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
"ed_ub92": pd.to_numeric(
pdf["UB_92_REV_CD"], errors="coerce"
)
.isin([450, 451, 452, 456, 459, 981])
.astype(int),
"ed_pos": (
pd.to_numeric(
pdf["PLC_OF_SRVC_CD"],
errors="coerce",
)
== 23
).astype(int),
}
)
)
self.df = self.df.assign(
ed_use=self.df[["ed_pos", "ed_cpt", "ed_ub92"]]
.any(axis="columns")
.astype(int)
)
# check ED use in other claims from the same visit
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
any_ed=pdf.groupby([pdf.index, "srvc_bgn_date"])[
"ed_use"
].transform("max")
)
)