Source code for medicaid_utils.preprocessing.max_ot

"""This module has MAXOT class which wraps together cleaning/ preprocessing routines specific for MAX OT files"""
from typing import Optional

import dask.dataframe as dd
import pandas as pd

from medicaid_utils.preprocessing import max_file
from medicaid_utils.common_utils import dataframe_utils


[docs] class MAXOT(max_file.MAXFile): def __init__( self, year: int, state: str, data_root: str, index_col: str = "BENE_MSIS", clean: bool = True, preprocess: bool = True, tmp_folder: Optional[str] = None, pq_engine: str = "pyarrow", ) -> None: """ Initializes MAX OT file object by preloading and preprocessing(if opted in) the file Parameters ---------- year : int Year of claim file state : str State of claim file data_root : str Root folder of raw claim files index_col : str, default='BENE_MSIS' Index column name. Eg. BENE_MSIS or MSIS_ID. The raw file is expected to be already sorted with index column clean : bool, default=True Should the associated files be cleaned? preprocess : bool, default=True Should the associated files be preprocessed? tmp_folder : str, default=None Folder location to use for caching intermediate results. Can be turned off by not passing this argument. pq_engine : str, default='pyarrow' Parquet engine to use """ super().__init__( "ot", year=year, state=state, data_root=data_root, index_col=index_col, clean=False, preprocess=False, tmp_folder=tmp_folder, pq_engine=pq_engine, ) self.dct_default_filters = { "missing_dob": 0, "duplicated": 0, "missing_srvc_bgn_date": 0, } if clean: self.clean() if preprocess: self.preprocess()
[docs] def clean(self) -> None: """Runs cleaning routines and adds common exclusion flags based on default filters""" super().clean() self.cache_results() self.clean_diag_codes() self.clean_proc_codes() self.flag_common_exclusions() self.flag_duplicates() self.cache_results()
[docs] def preprocess(self) -> None: """Adds payment, ed use, transport, dental, and em flags""" self.calculate_payment() self.flag_ed_use() self.flag_transport() self.flag_dental() self.flag_em() self.cache_results()
[docs] def flag_ip_overlaps_and_ed(self, df_ip: pd.DataFrame) -> None: """ Adds flags to indicate overlaps with IP claims Parameters ---------- df_ip : pd.DataFrame IP claim dataframe """ self.find_ot_ip_overlaps(df_ip) self.add_ot_flags() self.cache_results()
[docs] def flag_common_exclusions(self) -> None: self.df = dataframe_utils.fix_index( self.df, self.index_col, drop_column=True ) def _flag_excl(pdf): php = pd.to_numeric(pdf["PHP_TYPE"], errors="coerce") clm = pd.to_numeric(pdf["TYPE_CLM_CD"], errors="coerce") is_encounter = (php == 77) | ((php == 88) & (clm == 3)) is_capitation = (php == 88) & (clm == 2) return pdf.assign( excl_missing_dob=pdf["birth_date"].isnull().astype(int), excl_missing_srvc_bgn_date=pdf["srvc_bgn_date"] .isnull() .astype(int), excl_encounter_claim=is_encounter.astype(int), excl_capitation_claim=is_capitation.astype(int), excl_ffs_claim=( ~(is_encounter | is_capitation) ).astype(int), excl_female=(pdf["female"] == 1).astype(int), ) self.df = self.df.map_partitions(_flag_excl)
[docs] def flag_duplicates(self) -> None: self.df = dataframe_utils.fix_index(self.df, self.index_col, True) self.df = self.df.map_partitions( lambda pdf: pdf.assign( excl_duplicated=pdf.assign(_index_col=pdf.index)[ [col for col in pdf.columns if col != "excl_duplicated"] ] .duplicated(keep="first") .astype(int) ) )
[docs] def flag_em(self) -> None: """ Flag claim if procedure code belongs to E/M category New Column(s): EM - 0 or 1, 1 when PRCDR_CD in [99201, 99215] or [99301, 99350] """ def _flag_em(pdf): prcdr = pd.to_numeric(pdf["PRCDR_CD"], errors="coerce") prcdr_sys = pd.to_numeric( pdf["PRCDR_CD_SYS"], errors="coerce" ) return pdf.assign( EM=( (prcdr_sys == 1) & ( prcdr.between(99201, 99205, inclusive="both") | prcdr.between(99211, 99215, inclusive="both") | prcdr.between(99381, 99387, inclusive="both") | prcdr.between(99391, 99397, inclusive="both") ) ).astype(int) ) self.df = self.df.map_partitions(_flag_em)
[docs] def flag_dental(self) -> None: """ Flag dental claims New Column(s): dental_TOS - 0 or 1, 1 when MAX_TOS = 9 dental_PRCDR - 0 or 1, 1 when PRCDR_CD starts with 'D' dental - 0 or 1, 1 when any of dental_TOS or dental_PRCDR """ dental_tos = ( dd.to_numeric(self.df["MAX_TOS"], errors="coerce") == 9 ) dental_proc = ( self.df.PRCDR_CD.astype(str).str.slice(stop=1) == "D" ) self.df = self.df.assign( dental_TOS=dental_tos.astype(int), dental_PROC=dental_proc.astype(int), dental=(dental_tos | dental_proc).astype(int), )
[docs] def flag_transport(self) -> None: """ Flag transport claims New Column(s): transport_TOS - 0 or 1, 1 when MAX_TOS = 26 transport_POS - 0 or 1, 1 when PLC_OF_SRVC_CD is 41 or 42 transport - 0 or 1, 1 when any of transport_TOS or transport_POS """ self.df = self.df.map_partitions( lambda pdf: pdf.assign( transport_TOS=( pd.to_numeric( pdf["MAX_TOS"], errors="coerce" # TOS: TYPE OF SERVICE ) == 26 ).astype(int), transport_POS=pd.to_numeric( pdf["PLC_OF_SRVC_CD"], # # 41 = AMBULANCE - LAND 42 = AMBULANCE - AIR OR WATER errors="coerce", ) .isin([41, 42]) .astype(int), ) ) self.df["transport"] = ( self.df[["transport_TOS", "transport_POS"]] .any(axis="columns") .astype(int) )
[docs] def find_ot_ip_overlaps(self, df_ip: dd.DataFrame) -> None: """ Checks for OT claims that have an overlapping IP claim New Column(s): overlap - 0 or 1, 1 when OT claim has an overlapping IP claim :param DataFrame df_ip: IP DataFrame """ df_ot_ip = ( self.df[["srvc_bgn_date", "srvc_end_date"]] .repartition(partition_size="10MB") .merge( df_ip[["admsn_date", "srvc_end_date"]].rename( columns={"srvc_end_date": "ip_srvc_end_date"} ), left_index=True, right_index=True, how="left", ) ) df_ot_ip = dataframe_utils.fix_index(df_ot_ip, self.index_col) df_ot_ip = df_ot_ip.map_partitions( lambda pdf: pdf.assign( overlap=( pdf["srvc_bgn_date"].between( pdf["admsn_date"], pdf["ip_srvc_end_date"], inclusive="both", ) | pdf["srvc_end_date"].between( pdf["admsn_date"], pdf["ip_srvc_end_date"], inclusive="both", ) ).astype(int) ) ) df_ot_ip = df_ot_ip.map_partitions( lambda pdf: pdf.groupby( [self.index_col, "srvc_bgn_date", "srvc_end_date"] )["overlap"] .max() .reset_index(drop=False) .set_index(self.index_col) ) df_ot_ip = dataframe_utils.fix_index(df_ot_ip, self.index_col) df_ot_ip = df_ot_ip.compute() self.df = self.df[self.df.columns.difference(["overlap"])].merge( df_ot_ip, on=[self.index_col, "srvc_bgn_date", "srvc_end_date"], how="inner", ) self.df = dataframe_utils.fix_index(self.df, self.index_col)
[docs] def add_ot_flags(self) -> None: """ Assign flags for IP, OT and ED calculation Based on hierarchical principal: IP first,then ED, and then OT Marks claims that have overlapping IP claims, has ED services or have only OT services New Column(s): ip_incl - 0 or 1, 1 when has no dental and transport claims, and has an overlapping IP claim ed_incl - 0 or 1, 1 when has no dental and transport claims, has no overlapping IP claim, and has an ED service in any visits corresponding this claim ot_incl - 0 or 1, 1 when has no dental and transport claims, has no overlapping IP claim, and has no ED service in any visits corresponding this claim flag_drop - 0 or 1, 1 when ip_incl, ed_incl and ot_incl are all null """ self.df["ip_incl"] = 0 self.df["ed_incl"] = 0 self.df["ed_incl_dental"] = 0 self.df["ot_incl"] = 0 transport_mask = self.df["transport"] != 1 dental_mask = self.df["dental"] != 1 overlap_mask = self.df["overlap"] == 1 ed_claim_mask = self.df["any_ed"] == 1 duration_mask = (self.df["duration"] >= 0) | ( self.df["srvc_end_date"].isna() & (self.df.srvc_bgn_date.dt.year <= self.df.year) ) ip_mask = transport_mask & dental_mask & overlap_mask ed_mask = ( transport_mask & dental_mask & (~overlap_mask) & duration_mask & ed_claim_mask ) ed_mask_dental_allowed = ( transport_mask & (~overlap_mask) & duration_mask & ed_claim_mask ) ot_mask = ( transport_mask & dental_mask & (~overlap_mask) & duration_mask & (~ed_claim_mask) ) self.df["ip_incl"] = self.df["ip_incl"].where(~ip_mask, 1) self.df["ed_incl"] = self.df["ed_incl"].where(~ed_mask, 1) self.df["ed_incl_dental"] = self.df["ed_incl_dental"].where( ~ed_mask_dental_allowed, 1 ) self.df["ot_incl"] = self.df["ot_incl"].where(~ot_mask, 1) self.df["flag_drop"] = ( self.df[["ip_incl", "ed_incl", "ot_incl"]].sum(axis=1) < 1 ).astype(int)