Source code for medicaid_utils.preprocessing.taf_file

"""This module has TAFFile class from which is the base class for all TAF
file type classes"""
import os
import errno
import shutil
import logging
from typing import Any, Optional

import numpy as np
import dask.dataframe as dd
import pandas as pd
from pyarrow.lib import ArrowInvalid, ArrowTypeError

from medicaid_utils.common_utils import dataframe_utils, links


[docs] class TAFFile: """Parent class for all TAF file classes, each of which will have clean and preprocess functions""" def __init__( self, ftype: str, year: int, state: str, data_root: str, index_col: str = "BENE_MSIS", clean: bool = True, preprocess: bool = True, tmp_folder: Optional[str] = None, pq_engine: str = "pyarrow", ) -> None: """ Initializes TAF file object by preloading and preprocessing(if opted in) the associated files Parameters ---------- ftype : {'ip', 'ot', 'rx', 'ps'} TAF Claim type. year : int Year of claim file state : str State of claim file data_root : str Root folder of raw claim files index_col : str, default='BENE_MSIS' Index column name. Eg. BENE_MSIS or MSIS_ID. The raw file is expected to be already sorted with index column clean : bool, default=True Should the associated files be cleaned? preprocess : bool, default=True Should the associated files be preprocessed? tmp_folder : str, default=None Folder location to use for caching intermediate results. Can be turned off by not passing this argument. pq_engine : str, default='pyarrow' Parquet engine to use Raises ------ FileNotFoundError Raised when any of the subtype files are missing Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP """ self.data_root = data_root self.dct_fileloc = links.get_taf_parquet_loc( data_root, ftype, state, year ) self.ftype = ftype self.index_col = index_col self.year = year self.state = state self.tmp_folder = tmp_folder self.pq_engine = pq_engine self.allowed_missing_ftypes = [ "occurrence_code", "disability", "mfp", "waiver", "home_health", "managed_care", "line_ndc_codes", "base_diag_codes", "diag_and_ndc_codes", ] self.non_core_ftypes = [ "line_ndc_codes", "base_diag_codes", "diag_and_ndc_codes", ] for subtype in list(self.dct_fileloc.keys()): if not os.path.exists(self.dct_fileloc[subtype]): if subtype not in self.non_core_ftypes: logging.info("%s does not exist for %s", subtype, state) if subtype not in self.allowed_missing_ftypes: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), self.dct_fileloc[subtype], ) self.dct_fileloc.pop(subtype) self.dct_files = {} has_bene = True sorted_index = True # if (year not in [2015, 2016]) else False for ftype, file_loc in self.dct_fileloc.items(): df = dd.read_parquet(file_loc, index=False, engine=self.pq_engine) if "BENE_MSIS" not in df.columns: df = df.assign( HAS_BENE=(df["BENE_ID"].fillna("").str.len() > 0).astype( int ) ) self.index_col = df.index.name has_bene = False sorted_index = False df = df.map_partitions( lambda pdf: pdf.assign( BENE_MSIS=pdf["STATE_CD"] + "-" + pdf["HAS_BENE"].astype(str) + "-" + pdf["BENE_ID"] .where(pdf["BENE_ID"].fillna("").str.len() > 0, np.nan) .fillna(pdf["MSIS_ID"]) ) ) self.dct_files[ftype] = df if not has_bene: self.cache_results() self.index_col = index_col self.dct_files = { ftype: self.dct_files[ftype].set_index( self.index_col, sorted=sorted_index ) for ftype, _ in self.dct_fileloc.items() } if not sorted_index: self.cache_results() self.dct_collist = { ftype: list(claim_df.columns) for ftype, claim_df in self.dct_files.items() } # This dictionary variable can be used to filter out data that will # not met minimum quality expections self.dct_default_filters = {} if clean: self.clean() if preprocess: self.preprocess()
[docs] @classmethod def get_claim_instance( cls, claim_type: str, *args: Any, **kwargs: Any ) -> "TAFFile": """ Returns an instance of the requested claim type Parameters ---------- claim_type : {'ip', 'ot', 'cc', 'rx'} Claim type *args : list List of position arguments **kwargs : dict Dictionary of keyword arguments Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> ip_claim = TAFFile.get_claim_instance('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP """ return next( claim for claim in cls.__subclasses__() if claim.__name__ == f"TAF{claim_type.upper()}" )(*args, **kwargs)
[docs] def add_custom_subtype(self, subtype_name: str, df_file: dd.DataFrame) -> None: """Add custom subtype file to claim object. Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP >>> import dask.dataframe as dd # doctest: +SKIP >>> import pandas as pd # doctest: +SKIP >>> df = dd.from_pandas(pd.DataFrame({'col': [1]}), npartitions=1) # doctest: +SKIP >>> taf.add_custom_subtype('my_subtype', df) # doctest: +SKIP """ self.dct_fileloc[subtype_name] = links.get_taf_parquet_loc( self.data_root, self.ftype, self.state, self.year )[subtype_name] self.dct_files[subtype_name] = df_file
[docs] def cache_results( self, subtype: Optional[str] = None, repartition: bool = False ) -> None: """ Save results in intermediate steps of some lengthy processing. Saving intermediate results speeds up processing, and avoid dask cluster crashes for large datasets Parameters ---------- subtype : str, default=None File type. Eg. 'header'. If empty, all subtypes will be cached repartition : bool, default=False Repartition the dask dataframe Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', tmp_folder='/tmp/cache') # doctest: +SKIP >>> taf.cache_results(subtype='base') # doctest: +SKIP """ for f_subtype in list(self.dct_files.keys()): if (subtype is not None) and (f_subtype != subtype): continue if self.tmp_folder is not None: self.pq_export( f_subtype, os.path.join(self.tmp_folder, f_subtype), repartition=repartition, )
[docs] def pq_export(self, f_subtype: str, dest_path_and_fname: str, repartition: bool = False) -> None: """ Export parquet files (overwrite safe) Parameters ---------- f_subtype : str File type. Eg. 'header' dest_path_and_fname : str Destination path repartition : bool, default=False Repartition the dask dataframe Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP >>> taf.pq_export('base', '/tmp/output/base') # doctest: +SKIP """ if repartition: # and (f_subtype != "dates"): # patch for dates files self.dct_files[f_subtype] = self.dct_files[f_subtype].repartition( partition_size="100MB", force=True ) self.cache_results(f_subtype) if not self.dct_files[f_subtype].known_divisions: self.dct_files[f_subtype] = ( self.dct_files[f_subtype] .assign( **{ f"new_{self.index_col}": self.dct_files[ f_subtype ].index } ) .set_index(f"new_{self.index_col}", sorted=True) ) self.dct_files[f_subtype].index = self.dct_files[ f_subtype ].index.rename(self.index_col) os.makedirs(dest_path_and_fname, exist_ok=True) shutil.rmtree(dest_path_and_fname + "_tmp", ignore_errors=True) try: self.dct_files[f_subtype].to_parquet( dest_path_and_fname + "_tmp", engine=self.pq_engine, write_index=True, ) except (ArrowInvalid, ArrowTypeError): df = self.dct_files[f_subtype] obj_cols = [ c for c in df.columns if df[c].dtype == "object" ] if obj_cols: df = df.assign( **{c: df[c].astype(str) for c in obj_cols} ) idx_name = df.index.name if idx_name is not None: df = df.reset_index() if df[idx_name].dtype == "object": df[idx_name] = df[idx_name].astype(str) df = df.set_index(idx_name) self.dct_files[f_subtype] = df self.dct_files[f_subtype].to_parquet( dest_path_and_fname + "_tmp", engine=self.pq_engine, write_index=True, ) del self.dct_files[f_subtype] shutil.rmtree(dest_path_and_fname, ignore_errors=True) os.rename(dest_path_and_fname + "_tmp", dest_path_and_fname) self.dct_files[f_subtype] = dd.read_parquet( dest_path_and_fname, index=False, engine=self.pq_engine ) if (self.dct_files[f_subtype].npartitions > 1) or len( self.dct_files[f_subtype] ) > 0: self.dct_files[f_subtype] = self.dct_files[f_subtype].set_index( self.index_col, sorted=True ) else: self.dct_files[f_subtype] = self.dct_files[f_subtype].set_index( self.index_col )
[docs] def clean(self) -> None: """Cleaning routines to processes date and gender columns, and add duplicate check flags. Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP >>> taf.clean() # doctest: +SKIP """ self.process_date_cols() self.cache_results() self.flag_duplicates() self.cache_results()
[docs] def preprocess(self) -> None: """Add basic constructed variables. Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', preprocess=False) # doctest: +SKIP >>> taf.preprocess() # doctest: +SKIP """
[docs] def export( self, dest_folder: str, output_format: str = "csv", repartition: bool = False ) -> None: """ Exports the files. Parameters ---------- dest_folder : str Destination folder output_format : str, default='csv' Export format ('csv' or 'parquet') repartition : bool, default=False Repartition the dask dataframe Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP >>> taf.export('/tmp/output', output_format='csv') # doctest: +SKIP """ lst_subtype = list(self.dct_files.keys()) for subtype in lst_subtype: if output_format == "csv": self.dct_files[subtype].to_csv( os.path.join( dest_folder, f"{self.ftype}_{subtype}_{self.year}_{self.state}.csv", ), index=True, single_file=True, ) else: self.pq_export( subtype, os.path.join( dest_folder, self.dct_fileloc[subtype].split( self.data_root + ( os.path.sep if not self.data_root.endswith(os.path.sep) else "" ) )[1], ), repartition=repartition and (self.dct_files[subtype].npartitions > 100), )
[docs] def clean_codes(self) -> None: """Clean diagnostic code columns by removing non-alphanumeric characters and converting them to upper case and NDC codes columns by removing white space characters and padding 0s to the left so the codes are of length 12. Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP >>> taf.clean_codes() # doctest: +SKIP """ self.clean_diag_codes() self.clean_ndc_codes() self.clean_proc_codes()
[docs] def clean_diag_codes(self) -> None: """Clean diagnostic code columns by removing non-alphanumeric characters and converting them to upper case. Examples -------- >>> import pandas as pd >>> import dask.dataframe as dd >>> pdf = pd.DataFrame({'DGNS_CD_1': ['a12.3', 'B45-6'], ... 'other_col': [1, 2]}) >>> ddf = dd.from_pandas(pdf, npartitions=1) >>> from medicaid_utils.preprocessing.taf_file import TAFFile >>> obj = object.__new__(TAFFile) >>> obj.dct_files = {'base': ddf} >>> obj.clean_diag_codes() >>> result = obj.dct_files['base'].compute() >>> list(result['DGNS_CD_1']) ['A123', 'B456'] """ for ftype in self.dct_files: df = self.dct_files[ftype] lst_diag_cd_col = [ col for col in df.columns if col.startswith("DGNS_CD_") or (col == "ADMTG_DGNS_CD") ] if len(lst_diag_cd_col) > 0: df = df.map_partitions( lambda pdf, _cols=lst_diag_cd_col: pdf.assign( **{ col: pdf[col] .str.replace("[^a-zA-Z0-9]+", "", regex=True) .str.upper() for col in _cols } ) ) self.dct_files[ftype] = df
[docs] def clean_ndc_codes(self) -> None: """Clean NDC codes columns by removing white space characters and padding 0s to the left so the codes are of length 12. Examples -------- >>> import pandas as pd >>> import dask.dataframe as dd >>> pdf = pd.DataFrame({'NDC': ['1234', ' 5678 ']}) >>> ddf = dd.from_pandas(pdf, npartitions=1) >>> from medicaid_utils.preprocessing.taf_file import TAFFile >>> obj = object.__new__(TAFFile) >>> obj.dct_files = {'line': ddf} >>> obj.clean_ndc_codes() >>> result = obj.dct_files['line'].compute() >>> list(result['NDC']) ['000000001234', '000000005678'] """ for ftype in self.dct_files: df = self.dct_files[ftype] if "NDC" in df.columns: df = df.map_partitions( lambda pdf: pdf.assign( NDC=pdf["NDC"] .astype(str) .str.replace(" ", "") .str.zfill(12) ) ) self.dct_files[ftype] = df
[docs] def clean_proc_codes(self) -> None: """Clean procedure code columns by removing non-alphanumeric characters and converting them to upper case. Examples -------- >>> import pandas as pd >>> import dask.dataframe as dd >>> pdf = pd.DataFrame({'PRCDR_CD_1': ['ab.1', 'C2-d'], ... 'PRCDR_CD_SYS_1': ['ICD', 'CPT']}) >>> ddf = dd.from_pandas(pdf, npartitions=1) >>> from medicaid_utils.preprocessing.taf_file import TAFFile >>> obj = object.__new__(TAFFile) >>> obj.dct_files = {'base': ddf} >>> obj.clean_proc_codes() >>> result = obj.dct_files['base'].compute() >>> list(result['PRCDR_CD_1']) ['AB1', 'C2D'] >>> list(result['PRCDR_CD_SYS_1']) ['ICD', 'CPT'] """ for ftype in self.dct_files: df = self.dct_files[ftype] lst_prcdr_cd_col = [ col for col in df.columns if ( col.startswith("PRCDR_CD") or col.startswith("LINE_PRCDR_CD") ) and ( not ( col.startswith("PRCDR_CD_SYS") or col.startswith("PRCDR_CD_DT") or col.startswith("LINE_PRCDR_CD_SYS") or col.startswith("LINE_PRCDR_CD_DT") ) ) ] if len(lst_prcdr_cd_col) > 0: df = df.map_partitions( lambda pdf, _cols=lst_prcdr_cd_col: pdf.assign( **{ col: pdf[col] .str.replace("[^a-zA-Z0-9]+", "", regex=True) .str.upper() for col in _cols } ) ) self.dct_files[ftype] = df
[docs] def flag_duplicates(self) -> None: """ Removes duplicated rows. TAF claims have multiple versions for each month. This function keeps the most recent file version date for each month using the variables IP_VRSN, LT_VRSN, OT_VRSN, and RX_VRSN. Retains only the claims with maximum value of production data run ID (DA_RUN_ID) for each claim ID (CLM_ID). References ---------- - `Identifying beneficiaries with a substance use disorder <https://www.medicaid.gov/medicaid/data-and-systems/downloads/ macbis/sud_techspecs.docx>`_ Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP >>> taf.flag_duplicates() # doctest: +SKIP """ for ftype in self.dct_files: if ftype in ["base", "line"]: logging.info("Flagging duplicates for %s", ftype) df = self.dct_files[ftype] df = dataframe_utils.fix_index(df, self.index_col, True) df = df.assign( **{ col: dd.to_numeric(df[col], errors="coerce") .fillna(-1) .astype(int) for col in ["DA_RUN_ID", f"{self.ftype.upper()}_VRSN"] if col in df.columns } ) df = dataframe_utils.fix_index(df, self.index_col, True) df = df.map_partitions( lambda pdf: pdf.assign( excl_duplicated=pdf.assign(_index_col=pdf.index)[ [ col for col in pdf.columns if col != "excl_duplicated" ] ] .duplicated(keep="first") .astype(int) ) ) df = df.loc[df["excl_duplicated"] == 0] if ("DA_RUN_ID" in df.columns) and ("CLM_ID" in df.columns): df = df.map_partitions( lambda pdf: pdf.assign( max_run_id=pdf.groupby("CLM_ID")[ "DA_RUN_ID" ].transform("max") ) ) df = df.loc[df["DA_RUN_ID"] == df["max_run_id"]].drop( "max_run_id", axis=1 ) if "filing_period" in df.columns: df = df.map_partitions( lambda pdf: pdf.assign( max_version=pdf.groupby("filing_period")[ f"{self.ftype.lower()}_version" ].transform("max") ) ) df = df.loc[ df[f"{self.ftype.lower()}_version"] == df["max_version"] ].drop("max_version", axis=1) self.dct_files[ftype] = df
[docs] def gather_bene_level_diag_ndc_codes(self) -> None: """ Constructs patient level NDC and diagnosis code list columns and saves them to individual files. Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP >>> taf.gather_bene_level_diag_ndc_codes() # doctest: +SKIP """ if self.ftype in ["ip", "ot"]: df_base = self.dct_files["base"] df_base = df_base.map_partitions( lambda pdf: pdf.assign( LST_DIAG_CD=pdf[ [ col for col in pdf.columns if col.startswith("DGNS_CD_") or (col == "ADMTG_DGNS_CD") ] ].values.tolist() ) ) df_base = df_base.map_partitions( lambda pdf: pdf.groupby(pdf.index).agg({"LST_DIAG_CD": "sum"}) ) df_base = df_base.map_partitions( lambda pdf: pdf.assign( LST_DIAG_CD_RAW=pdf["LST_DIAG_CD"] .apply( lambda lst: ",".join([cd for cd in lst if bool(cd)]) ) .fillna(""), LST_DIAG_CD=pdf["LST_DIAG_CD"] .apply( lambda lst: ",".join(set(cd for cd in lst if bool(cd))) ) .fillna(""), ) ) self.add_custom_subtype("base_diag_codes", df_base) self.cache_results("base_diag_codes") df_line = self.dct_files["line"] df_line = df_line.map_partitions( lambda pdf: pdf.groupby(pdf.index)["NDC"] .apply(list) .reset_index(drop=False) .rename(columns={"NDC": "LST_NDC"}) .set_index(self.index_col) ) df_line = df_line.map_partitions( lambda pdf: pdf.assign( LST_NDC_RAW=pdf["LST_NDC"] .apply(lambda lst: ",".join([cd for cd in lst if bool(cd)])) .fillna(""), LST_NDC=pdf["LST_NDC"] .apply(lambda lst: ",".join(set(cd for cd in lst if bool(cd)))) .fillna(""), ) ) self.add_custom_subtype("line_ndc_codes", df_line) self.cache_results("line_ndc_codes")
[docs] def flag_ffs_and_encounter_claims(self) -> None: """ Flags claims where CLM_TYPE_CD is equal to one of the following values: - 1: A FFS Medicaid or Medicaid-expansion claim - 3: Medicaid or Medicaid-expanding managed care encounter record - A: Separate CHIP (Title XXI) FFS claim - C: Separate CHIP (Title XXI) encounter record References ---------- - `Identifying beneficiaries with a substance use disorder <https://www.medicaid.gov/medicaid/data-and-systems/downloads/ macbis/sud_techspecs.docx>`_ Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms') # doctest: +SKIP >>> taf.flag_ffs_and_encounter_claims() # doctest: +SKIP >>> 'ffs_or_encounter_claim' in taf.dct_files['base'].columns # doctest: +SKIP True """ df = self.dct_files["base"] df = df.assign( ffs_or_encounter_claim=df["CLM_TYPE_CD"] .str.strip() .isin([1, 3, "A", "C"]) .astype(int) ) self.dct_files["base"] = df
[docs] def process_date_cols(self) -> None: """ Convert datetime columns to datetime type and add basic date based constructed variables New columns: - birth_year, birth_month, birth_day - date components of EL_DOB (date of birth) - birth_date - birth date (EL_DOB) - death - 0 or 1, if DEATH_DT is not empty and falls in the claim year or before - age - age in years, integer format - age_day - age in days - adult - 0 or 1, 1 when patient's age is in [18,115] - child - 0 or 1, 1 when patient's age is in [0,17] If ftype == 'ip': Clean/ impute admsn_date and add ip duration related columns New column(s): - admsn_date - Admission date (ADMSN_DT) - srvc_bgn_date - Service begin date (SRVC_BGN_DT) - srvc_end_date - Service end date (SRVC_END_DT) - prncpl_proc_date - Principal procedure date (PRCDR_CD_DT_1) - missing_admsn_date - 0 or 1, 1 when admission date is missing - missing_prncpl_proc_date - 0 or 1, 1 when principal procedure date is missing - flag_admsn_miss - 0 or 1, 1 when admsn_date was imputed - los - ip service duration in days - ageday_admsn - age in days as on admsn_date - age_admsn - age in years, with decimals, as on admsn_date - age_prncpl_proc - age in years as on principal procedure date - age_day_prncpl_proc - age in days as on principal procedure date if ftype == 'ot': Adds duration column, provided service end and begin dates are clean New Column(s): - srvc_bgn_date - Service begin date (SRVC_BGN_DT) - srvc_end_date - Service end date (SRVC_END_DT) - diff & duration - duration of service in days - age_day_srvc_bgn - age in days as on service begin date - age_srvc_bgn - age in years, with decimals, as on service begin date if ftype == 'ps: New Column(s): - date_of_death - Date of death (DEATH_DT) Examples -------- >>> from medicaid_utils.preprocessing.taf_file import TAFFile # doctest: +SKIP >>> taf = TAFFile('ip', 2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP >>> taf.process_date_cols() # doctest: +SKIP """ for ftype in self.dct_files: logging.info("Processing date columns for %s", ftype) df = self.dct_files[ftype] if self.ftype in ["ip", "lt", "ot", "ps", "rx"]: dct_date_col = { "BIRTH_DT": "birth_date", "ADMSM_DT": "admsn_date", "ADMSN_DT": "admsn_date", "PRCDR_CD_DT_1": "prncpl_proc_date", "SRVC_BGN_DT": "srvc_bgn_date", "SRVC_END_DT": "srvc_end_date", "LINE_SRVC_BGN_DT": "srvc_bgn_date", "LINE_SRVC_END_DT": "srvc_end_date", "DEATH_DT": "death_date", "IMGRTN_STUS_5YR_BAR_END_DT": "immigration_status_5yr_bar_end_date", "ENRLMT_START_DT": "enrollment_start_date", "ENRLMT_END_DT": "enrollment_end_date", "RX_FILL_DT": "rx_fill_date", "PRSCRBD_DT": "prescribed_date", } dct_date_col = { col: new_col_name for col, new_col_name in dct_date_col.items() if col in df.columns } df = df.assign( **{ new_name: df[col] for col, new_name in dct_date_col.items() } ) # converting lst_col columns to datetime type lst_col_to_convert = list( { dct_date_col[col] for col in dct_date_col.keys() }.difference( set(df.select_dtypes(include=[np.datetime64]).columns) ) ) if bool(lst_col_to_convert): df = df.assign( **{ col: dd.to_datetime(df[col], errors="coerce") for col in lst_col_to_convert } ) if self.ftype in ["ip", "ot", "rx"]: if f"{self.ftype.upper()}_FIL_DT" in df.columns: if (self.year in [2018, 2014]) or ( (self.year == 2017) & (self.ftype in ["ot", "rx"]) ): df = df.assign( filing_period=df[ f"{self.ftype.upper()}_FIL_DT" ] .fillna("01JAN1000") .str.upper() ) df = df.assign( year=df.filing_period.str[-4:].astype(int) ) else: if self.ftype == "ip": df = df.assign( filing_period=df[ f"{self.ftype.upper()}_FIL_DT" ].str[1:7], ) else: df = df.assign( filing_period=df[ f"{self.ftype.upper()}_FIL_DT" ].str[0:6], ) df = df.assign( year=df.filing_period.str[:4].astype(int) ) df = df.assign( **{ f"{self.ftype.lower()}_version": dd.to_numeric( df[f"{self.ftype.upper()}_VRSN"], errors="coerce", ) } ) else: df = df.assign( year=dd.to_numeric( df.RFRNC_YR, errors="coerce" ).astype(int) ) # Remove abnormally large or low date values. The bounds # used are Dec 31, 1800 and Dec 31, 2100 if any( col in list(dct_date_col.values()) for col in df.columns ): df = df.assign( **{ col: df[col].where( df[col].between( pd.to_datetime( "18001231", errors="coerce" ), pd.to_datetime( "21001231", errors="coerce" ), inclusive="both", ), pd.to_datetime("", errors="coerce"), ) for col in list(dct_date_col.values()) if col in df.columns } ) if "birth_date" in df.columns: df = df.assign( birth_year=df.birth_date.dt.year, birth_month=df.birth_date.dt.month, birth_day=df.birth_date.dt.day, ) df = df.assign( age_day=( dd.to_datetime( df["year"].astype(str) + "1231", errors="coerce", ) - df["birth_date"] ).dt.days ) if "AGE" not in df.columns: df = df.assign(age=df.year - df.birth_year) if "AGE" in df.columns: df = df.assign( age=dd.to_numeric(df["AGE"], errors="coerce") ) if "AGE_GRP_CD" in df.columns: df = df.assign( age_group=dd.to_numeric( df["AGE_GRP_CD"], errors="coerce" ) .fillna(-1) .astype(int), ) if "age" in df.columns: df = df.assign( adult=df["age"] .between(18, 64, inclusive="both") .astype(pd.Int64Dtype()) .fillna(-1) .astype(int), elderly=(df["age"] >= 65) .astype(pd.Int64Dtype()) .fillna(-1) .astype(int), child=(df["age"] <= 17) .astype(pd.Int64Dtype()) .fillna(-1) .astype(int), ) if (self.ftype != "ps") and ("age" in df.columns): df = df.map_partitions( lambda pdf: pdf.assign( adult=pdf.groupby(pdf.index)["adult"].transform( "max" ), child=pdf.groupby(pdf.index)["child"].transform( "max" ), age=pdf.groupby(pdf.index)["age"].transform("max"), age_day=pdf.groupby(pdf.index)[ "age_day" ].transform("max"), ) ) if "death_date" in df.columns: df = df.assign( death=( df.death_date.dt.year.fillna(df.year + 10).astype( int ) <= df.year ) .fillna(False) .astype(int) ) if "DEATH_IND" in df.columns: df = df.assign( death=( dd.to_numeric(df["DEATH_IND"], errors="coerce") == 1 ) .fillna(False) .astype(int) ) if (self.ftype == "ip") and ("admsn_date" in df.columns): df = df.assign( missing_admsn_date=df["admsn_date"] .isnull() .astype(int) ) df = df.assign( admsn_date=df["admsn_date"].where( ~df["admsn_date"].isnull(), df["srvc_bgn_date"] ), ) df = df.assign( los=(df["srvc_end_date"] - df["admsn_date"]).dt.days + 1 ) df = df.assign( los=df["los"].where( ( (df["year"] >= df["admsn_date"].dt.year) & (df["admsn_date"] <= df["srvc_end_date"]) ), np.nan, ) ) df = df.assign( age_day_admsn=( df["admsn_date"] - df["birth_date"] ).dt.days ) df = df.assign( age_admsn=( df["age_day_admsn"].fillna(0) / 365.25 ).astype(int), ) if (self.ftype == "ip") and ("prncpl_proc_date" in df.columns): df = df.assign( prncpl_proc_date=df["prncpl_proc_date"].fillna( df["admsn_date"] ) ) df = df.map_partitions( lambda pdf: pdf.assign( prncpl_proc_date=pdf.groupby( [pdf.index, "CLM_ID"] )["prncpl_proc_date"].transform("min") ) ) if "birth_date" in df.columns: df = df.assign( age_day_prncpl_proc=( df["prncpl_proc_date"] - df["birth_date"] ).dt.days ) df = df.assign( age_prncpl_proc=( df["age_day_prncpl_proc"].fillna(0) / 365.25 ).astype(int), ) if (self.ftype == "ot") and ("srvc_bgn_date" in df.columns): df = df.map_partitions( lambda pdf: pdf.assign( srvc_bgn_date=pdf.groupby([pdf.index, "CLM_ID"])[ "srvc_bgn_date" ].transform("min") ) ) df = df.assign( duration=( df["srvc_end_date"] - df["srvc_bgn_date"] ).dt.days ) df = df.assign( duration=df["duration"].where( (df["srvc_bgn_date"] <= df["srvc_end_date"]), np.nan, ) ) if "birth_date" in df.columns: df = df.assign( age_day_srvc_bgn=( df["srvc_bgn_date"] - df["birth_date"] ).dt.days ) df = df.assign( age_srvc_bgn=( df["age_day_srvc_bgn"].fillna(0) / 365.25 ).astype(int), ) self.dct_files[ftype] = df