"""This module has MAXIP class which wraps together cleaning/ preprocessing routines specific for MAX IP files"""
from typing import Optional
import numpy as np
import pandas as pd
from medicaid_utils.preprocessing import max_file
from medicaid_utils.common_utils import dataframe_utils
[docs]
class MAXIP(max_file.MAXFile):
def __init__(
self,
year: int,
state: str,
data_root: str,
index_col: str = "BENE_MSIS",
clean: bool = True,
preprocess: bool = True,
tmp_folder: Optional[str] = None,
pq_engine: str = "pyarrow",
) -> None:
"""
Initializes MAX OT file object by preloading and preprocessing(if opted in) the file
Parameters
----------
year : int
Year of claim file
state : str
State of claim file
data_root : str
Root folder of raw claim files
index_col : str, default='BENE_MSIS'
Index column name. Eg. BENE_MSIS or MSIS_ID. The raw file is expected to be already
sorted with index column
clean : bool, default=True
Should the associated files be cleaned?
preprocess : bool, default=True
Should the associated files be preprocessed?
tmp_folder : str, default=None
Folder location to use for caching intermediate results. Can be turned off by not passing this argument.
pq_engine : str, default='pyarrow'
Parquet engine to use
"""
super().__init__(
"ip",
year=year,
state=state,
data_root=data_root,
index_col=index_col,
clean=False,
preprocess=False,
tmp_folder=tmp_folder,
pq_engine=pq_engine,
)
self.dct_default_filters = {"missing_dob": 0, "duplicated": 0}
if clean:
self.clean()
if preprocess:
self.preprocess()
[docs]
def clean(self) -> None:
"""Runs cleaning routines and adds common exclusion flags based on default filters"""
super().clean()
self.clean_diag_codes()
self.clean_proc_codes()
self.flag_common_exclusions()
self.flag_duplicates()
[docs]
def preprocess(self) -> None:
"""Adds payment, ed use, and overlap flags"""
self.calculate_payment()
self.flag_ed_use()
self.flag_ip_overlaps()
[docs]
def flag_common_exclusions(self) -> None:
def _flag_excl(pdf):
php = pd.to_numeric(pdf["PHP_TYPE"], errors="coerce")
clm = pd.to_numeric(pdf["TYPE_CLM_CD"], errors="coerce")
is_encounter = (php == 77) | ((php == 88) & (clm == 3))
is_capitation = (php == 88) & (clm == 2)
return pdf.assign(
excl_missing_dob=pdf["birth_date"].isnull().astype(int),
excl_missing_admsn_date=pdf["admsn_date"]
.isnull()
.astype(int),
excl_missing_prncpl_proc_date=pdf["prncpl_proc_date"]
.isnull()
.astype(int),
excl_encounter_claim=is_encounter.astype(int),
excl_capitation_claim=is_capitation.astype(int),
excl_ffs_claim=(
~(is_encounter | is_capitation)
).astype(int),
excl_delivery=(
pd.to_numeric(
pdf["RCPNT_DLVRY_CD"], errors="coerce"
)
== 1
).astype(int),
excl_female=(pdf["female"] == 1).astype(int),
)
self.df = self.df.map_partitions(_flag_excl)
[docs]
def flag_duplicates(self) -> None:
self.df = dataframe_utils.fix_index(self.df, self.index_col, True)
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
excl_duplicated=pdf.assign(_index_col=pdf.index)[
[col for col in pdf.columns if col != "excl_duplicated"]
]
.duplicated(keep="first")
.astype(int)
)
)
[docs]
def flag_ip_overlaps(self) -> None:
"""
Identifies duplicate/ overlapping claims.
When several/ overlapping claims exist with the same MSIS_ID, claim with the largest payment amount is retained.
New Column(s):
flag_ip_undup - 0 or 1, 1 when row is not a duplicate
flag_ip_dup_drop - 0 or 1, 1 when row is duplicate and must be dropped
flag_ip_overlap_drop - 0 or 1, 1 when row overlaps with another claim
ip_incl - 0 or 1, 1 when row is clean (flag_ip_dup_drop = 0 & flag_ip_overlap_drop = 0) and has los > 0
:param df dd.DataFrame:
:rtype: None
"""
df_flagged = self.df.assign(**{self.index_col: self.df.index})
df_flagged["flag_ip_dup_drop"] = np.nan
df_flagged["flag_ip_undup"] = np.nan
df_flagged["admsntime"] = np.nan
df_flagged["next_admsn_date"] = pd.to_datetime(np.nan, errors="coerce")
df_flagged["last_admsn_date"] = pd.to_datetime(np.nan, errors="coerce")
df_flagged["next_pymt_amt"] = np.nan
df_flagged["last_pymt_amt"] = np.nan
df_flagged["next_srvc_end_date"] = pd.to_datetime(
np.nan, errors="coerce"
)
df_flagged["last_srvc_end_date"] = pd.to_datetime(
np.nan, errors="coerce"
)
def _mptn_check_ip_overlaps(pdf_partition: pd.DataFrame) -> pd.DataFrame:
pdf_partition = pdf_partition.reset_index(drop=True)
# check duplicate claims (same ID, admission date), flag the largest payment amount
pdf_partition = pdf_partition.sort_values(
by=[self.index_col, "admsn_date", "pymt_amt", "los"],
ascending=[True, True, False, False],
)
pdf_partition["flag_ip_dup_drop"] = (
pdf_partition.groupby([self.index_col, "admsn_date", "los"])[
"pymt_amt"
].rank(method="first", ascending=False)
!= 1
).astype(int)
pdf_partition["flag_ip_undup"] = (
pdf_partition.groupby([self.index_col, "admsn_date"])[
self.index_col
].transform("count")
== 1
).astype(int)
overlap_mask = (
pd.to_numeric(pdf_partition["los"], errors="coerce") > 0
) & (pdf_partition["flag_ip_dup_drop"] != 1)
pdf_partition.loc[overlap_mask, "admsntime"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["admsn_date"]
.rank(method="dense")
)
pdf_partition = pdf_partition.sort_values(
by=[self.index_col, "admsntime"]
)
pdf_partition.loc[overlap_mask, "next_admsn_date"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["admsn_date"]
.shift(-1)
)
pdf_partition.loc[overlap_mask, "next_pymt_amt"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["pymt_amt"]
.shift(-1)
)
pdf_partition.loc[overlap_mask, "next_srvc_end_date"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["srvc_end_date"]
.shift(-1)
)
pdf_partition.loc[overlap_mask, "last_admsn_date"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["admsn_date"]
.shift(1)
)
pdf_partition.loc[overlap_mask, "last_pymt_amt"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["pymt_amt"]
.shift(1)
)
pdf_partition.loc[overlap_mask, "last_srvc_end_date"] = (
pdf_partition.loc[
overlap_mask,
]
.groupby(self.index_col)["srvc_end_date"]
.shift(1)
)
pdf_partition = pdf_partition.sort_values(
by=[self.index_col, "admsn_date", "pymt_amt"]
)
pdf_partition = pdf_partition.set_index(self.index_col, drop=False)
return pdf_partition
df_flagged = df_flagged.map_partitions(
_mptn_check_ip_overlaps, meta=df_flagged.head(1)
)
df_flagged["flag_overlap_next"] = (
df_flagged["next_admsn_date"].notnull()
& (df_flagged["srvc_end_date"] > df_flagged["next_admsn_date"])
).astype(int)
df_flagged["flag_overlap_last"] = (
df_flagged["last_admsn_date"].notnull()
& (df_flagged["admsn_date"] < df_flagged["last_srvc_end_date"])
).astype(int)
df_flagged["flag_overlap_drop"] = (
(
(df_flagged["flag_overlap_next"] == 1)
& (df_flagged["pymt_amt"] < df_flagged["next_pymt_amt"])
)
| (
(df_flagged["flag_overlap_last"] == 1)
& (df_flagged["pymt_amt"] <= df_flagged["last_pymt_amt"])
)
).astype(int)
df_flagged["ip_incl"] = (
(df_flagged["los"].astype(float) > 0)
& (df_flagged["flag_ip_dup_drop"] != 1)
& (df_flagged["flag_overlap_drop"] != 1)
).astype(int)
df_flagged = dataframe_utils.fix_index(
df_flagged, self.index_col, True
)
self.df = df_flagged