Source code for medicaid_utils.adapted_algorithms.py_cdpsmrx.cdps_rx_risk_adjustment

import os
from typing import List, Tuple

import numpy as np
import pandas as pd
import dask.dataframe as dd


[docs] class CdpsRxRiskAdjustment: package_folder, filename = os.path.split(__file__) data_folder = os.path.join(package_folder, "data")
[docs] @classmethod def add_age_and_gender_cols( cls, df: dd.DataFrame ) -> Tuple[dd.DataFrame, List[str]]: """ Adds age & gender related covariates used in CDPS Rx Risk Adjustment model. Returns a dataframe with the below columns: * a_under1 - 0 or 1, age<=1 * a_1_4 - 0 or 1, 1<age<5 * a_5_14m - 0 or 1, 5<=age<15 male * a_5_14f - 0 or 1, 5<=age<15 female * a_15_24m - 0 or 1, 15<=age<25 male * a_15_24f - 0 or 1, 15<=age<25 female * a_25_44m - 0 or 1, 25<=age<45 male * a_25_44f - 0 or 1, 25<=age<45 female * a_45_64m - 0 or 1, 45<=age<65 male * a_45_64f - 0 or 1, 45<=age<65 female * a_65 - 0 or 1, 65<=age Parameters ---------- df - dask.DataFrame Returns ------- dask.DataFrame list of str List of age and gender column names added. Examples -------- >>> # Requires a dask DataFrame with 'Female' and 'age' columns >>> df, lst_cols = CdpsRxRiskAdjustment.add_age_and_gender_cols(df) # doctest: +SKIP >>> 'a_under1' in lst_cols # doctest: +SKIP True """ lst_age_and_gender_cols = [ "a_under1", "a_1_4", "a_5_14m", "a_5_14f", "a_15_24m", "a_15_24f", "a_25_44m", "a_25_44f", "a_45_64m", "a_45_64f", "a_65", ] df = df.assign( male=df["Female"].where( df["Female"].isna(), (df["Female"] == 0).astype(int) ) ) df = df.assign( a_under1=((df["age"] <= 1) | df["age"].isna()).astype(int), a_1_4=(df["age"].between(1, 5, inclusive="neither")).astype(int), a_5_14m=( (df["age"] >= 5) & (df["age"] < 15) & (df["male"] == 1) ).astype(int), a_5_14f=( (df["age"] >= 5) & (df["age"] < 15) & (df["male"] == 0) ).astype(int), a_15_24m=( (df["age"] >= 15) & (df["age"] < 25) & (df["male"] == 1) ).astype(int), a_15_24f=( (df["age"] >= 15) & (df["age"] < 25) & (df["male"] == 0) ).astype(int), a_25_44m=( (df["age"] >= 25) & (df["age"] < 45) & (df["male"] == 1) ).astype(int), a_25_44f=( (df["age"] >= 25) & (df["age"] < 45) & (df["male"] == 0) ).astype(int), a_45_64m=( (df["age"] >= 45) & (df["age"] < 65) & (df["male"] == 1) ).astype(int), a_45_64f=( (df["age"] >= 45) & (df["age"] < 65) & (df["male"] == 0) ).astype(int), a_65=(df["age"] >= 65).astype(int), ) return df, lst_age_and_gender_cols
[docs] @classmethod def add_cdps_category_cols( # pylint: disable=too-many-locals cls, df: dd.DataFrame, lst_diag_cd_col_name: str ) -> Tuple[dd.DataFrame, List[str]]: """ Adds CDPS category columns Parameters ---------- df : dask.DataFrame PS Dataframe with LST_DIAG_CD column lst_diag_cd_col_name : str Returns ------- dask.DataFrame list of str List of CDPS category column names added. Examples -------- >>> # Requires a dask DataFrame with diagnosis code list and aid columns >>> df, lst_cols = CdpsRxRiskAdjustment.add_cdps_category_cols( # doctest: +SKIP ... df, 'LST_DIAG_CD') """ # Load var lists and maps data_folder = os.path.join(cls.data_folder, "cdps") pdf_var_map = pd.read_csv( os.path.join(data_folder, "cdps_hierarchical_vars.csv") ) pdf_adult_var_map = pdf_var_map.loc[pdf_var_map["adult"] == 1] dct_adult_diag_var = ( pdf_adult_var_map[["val", "label"]] .set_index("val") .to_dict()["label"] ) pdf_child_var_map = pdf_var_map.loc[pdf_var_map["adult"] == 0] dct_child_diag_var = ( pdf_child_var_map[["val", "label"]] .set_index("val") .to_dict()["label"] ) lst_interaction_vars = [ lbl.strip() for lbl in pd.read_csv( os.path.join(data_folder, "lst_interaction_vars.csv"), header=None, )[0].tolist() ] lst_regression_vars = [ lbl.strip() for lbl in pd.read_csv( os.path.join(data_folder, "lst_regr_var_cdps.csv"), header=None )[0].tolist() ] lst_adult_vars = [ lbl.strip() for lbl in pd.read_csv( os.path.join(data_folder, "lst_cdps_categories_adult.csv"), header=None, )[0].tolist() ] lst_child_vars = [ lbl.strip() for lbl in pd.read_csv( os.path.join(data_folder, "lst_cdps_categories_child.csv"), header=None, )[0].tolist() ] lst_non_hierarchical_diags_adult = ( pdf_adult_var_map.loc[pdf_adult_var_map["level"] == 0]["label"] .unique() .tolist() ) lst_non_hierarchical_diags_child = ( pdf_child_var_map.loc[pdf_child_var_map["level"] == 0]["label"] .unique() .tolist() ) pdf_adult_hierarchical_labels = ( pdf_adult_var_map.loc[pdf_adult_var_map["level"] != 0][ ["label", "level", "level_order"] ] .drop_duplicates() .sort_values(by=["level", "level_order"], ascending=True) ) pdf_child_hierarchical_labels = ( pdf_child_var_map.loc[pdf_child_var_map["level"] != 0][ ["label", "level", "level_order"] ] .drop_duplicates() .sort_values(by=["level", "level_order"], ascending=True) ) lst_common_hierarchical_labels = list( set(pdf_adult_hierarchical_labels.label.tolist()).intersection( set(pdf_child_hierarchical_labels.label.tolist()) ) ) dct_diag_hierarchies_adult = ( pdf_adult_hierarchical_labels.set_index(["level", "level_order"]) .groupby(level=0) .apply(lambda pdf: pdf.xs(pdf.name)["label"].to_dict()) .to_dict() ) dct_diag_hierarchies_child = ( pdf_child_hierarchical_labels.set_index(["level", "level_order"]) .groupby(level=0) .apply(lambda pdf: pdf.xs(pdf.name)["label"].to_dict()) .to_dict() ) # Fix missing levels in child diag hierarchies for level in list(dct_diag_hierarchies_child.keys()): if len(list(dct_diag_hierarchies_child[level].keys())) != max( list(dct_diag_hierarchies_child[level].keys()) ): dct_level = { idx + 1: item[1] for idx, item in enumerate( list( sorted( dct_diag_hierarchies_child[level].items() ) ) ) } dct_diag_hierarchies_child[level] = dct_level # Store list of new columns lst_cdps_cat_cols = list( set( lst_adult_vars + lst_child_vars + lst_interaction_vars + ["NONE", "OTHER", "NOCDPS"] ) ) df = df.assign( NONE=( df[lst_diag_cd_col_name].isna() | (df[lst_diag_cd_col_name].str.strip() == "") ).astype(int) ) df["LST_ADULT_DIAG_CD"] = "" df = df.assign( LST_ADULT_DIAG_CD=df["LST_ADULT_DIAG_CD"].str.split(",") ) df = df.assign( LST_CHILD_DIAG_CD=df["LST_ADULT_DIAG_CD"], LST_ADULT_DIAG_CD=df["LST_ADULT_DIAG_CD"].where( ~df["aid"].isin(["DA", "AA"]), df[lst_diag_cd_col_name].apply( lambda lst_raw_diag: list( { dct_adult_diag_var.get( diag_cd.strip().upper().replace(".", ""), "OTHER", ) for diag_cd in str(lst_raw_diag).split(",") } ) if pd.notna(lst_raw_diag) else [] ), ), ) df = df.assign( LST_CHILD_DIAG_CD=df["LST_CHILD_DIAG_CD"].where( ~df["aid"].isin(["DC", "AC"]), df[lst_diag_cd_col_name].apply( lambda lst_raw_diag: list( { dct_child_diag_var.get( diag_cd.strip().upper().replace(".", ""), "OTHER", ) for diag_cd in str(lst_raw_diag).split(",") } ) if pd.notna(lst_raw_diag) else [] ), ) ) df = df.map_partitions( lambda pdf: pdf.assign( OTHER=( ( pdf["aid"].isin(["DA", "AA"]) & pd.DataFrame( pdf["LST_ADULT_DIAG_CD"].tolist(), index=pdf.index ) .isin(["OTHER"]) .any(axis=1) | ( pdf["aid"].isin(["DC", "AC"]) & pd.DataFrame( pdf["LST_CHILD_DIAG_CD"].tolist(), index=pdf.index, ) .isin(["OTHER"]) .any(axis=1) ) ).astype(int) ) ), meta=df._meta.assign(OTHER=0), ) _meta_adult_nh = df._meta.assign( **{diag: 0 for diag in lst_non_hierarchical_diags_adult} ) df = df.map_partitions( lambda pdf: pdf.assign( **{ diag: ( pdf["aid"].isin(["DA", "AA"]) & pd.DataFrame( pdf["LST_ADULT_DIAG_CD"].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) ).astype(int) for diag in lst_non_hierarchical_diags_adult } ), meta=_meta_adult_nh, ) _meta_child_nh = df._meta.assign( **{diag: 0 for diag in lst_non_hierarchical_diags_child} ) df = df.map_partitions( lambda pdf: pdf.assign( **dict( [ ( diag, ( pdf["aid"].isin(["DC", "AC"]) & pd.DataFrame( pdf["LST_CHILD_DIAG_CD"].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) ).astype(int), ) for diag in lst_non_hierarchical_diags_child if diag not in lst_non_hierarchical_diags_adult ] + [ ( diag, pdf[diag].where( ~pdf["aid"].isin(["DC", "AC"]), pd.DataFrame( pdf["LST_CHILD_DIAG_CD"].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) .astype(int), ), ) for diag in lst_non_hierarchical_diags_child if diag in lst_non_hierarchical_diags_adult ] ) ), meta=_meta_child_nh, ) _meta_adult_h = df._meta.assign( **{ diag: 0 for dct_level in dct_diag_hierarchies_adult.values() for diag in dct_level.values() } ) df = df.map_partitions( lambda pdf: pdf.assign( **{ diag: ( pdf["aid"].isin(["DA", "AA"]) & pd.DataFrame( pdf["LST_ADULT_DIAG_CD"].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) & ( (level_order == 1) | ( ~pd.DataFrame( pdf[ "LST_ADULT_DIAG_CD" ].tolist(), index=pdf.index, ) .isin( [ diag for level_order, diag in sorted( dct_level.items() )[ : level_order - 1 ] ] ) .any(axis=1) ) ) ).astype(int) for level, dct_level in dct_diag_hierarchies_adult.items() for level_order, diag in sorted( dct_level.items() ) } ), meta=_meta_adult_h, ) _meta_child_h = df._meta.assign( **{ diag: 0 for dct_level in dct_diag_hierarchies_child.values() for diag in dct_level.values() } ) df = df.map_partitions( lambda pdf: pdf.assign( **dict( # pylint: disable=consider-using-dict-comprehension [ item for sublist in [ [ ( diag, ( pdf["aid"].isin(["DC", "AC"]) & pd.DataFrame( pdf["LST_CHILD_DIAG_CD"].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) & ( (level_order == 1) | ( ~pd.DataFrame( pdf[ "LST_CHILD_DIAG_CD" ].tolist(), index=pdf.index, ) .isin( [ diag for level_order, diag in sorted( dct_level.items() )[ : level_order - 1 ] ] ) .any(axis=1) ) ) ).astype(int), ) for level_order, diag in sorted( dct_level.items() ) if diag not in lst_common_hierarchical_labels ] + [ ( diag, pdf[diag].where( ~pdf["aid"].isin(["DC", "AC"]), ( pd.DataFrame( pdf[ "LST_CHILD_DIAG_CD" ].tolist(), index=pdf.index, ) .isin([diag]) .any(axis=1) & ( (level_order == 1) | ( ~pd.DataFrame( pdf[ "LST_CHILD_DIAG_CD" ].tolist(), index=pdf.index, ) .isin( [ diag for level_order, diag in sorted( dct_level.items() )[ : level_order - 1 ] ] ) .any(axis=1) ) ) ).astype(int), ), ) for level_order, diag in sorted( dct_level.items() ) if diag in lst_common_hierarchical_labels ] for level, dct_level in dct_diag_hierarchies_child.items() ] for item in sublist ] ) ), meta=_meta_child_h, ) df = df.assign( **{ diag: df[diag].where(~df["aid"].isin(["DC", "AC"]), 0) for diag in ["CANB", "DIA1H", "DIA1M", "DIA2M", "EYEL"] } ) df = df.assign( PULH=df["PULH"].where( ~df["aid"].isin(["AA", "AC", "AG"]), df[["PULH", "PULVH"]].max(axis=1), ), PULVH=df["PULVH"].where(~df["aid"].isin(["AA", "AC", "AG"]), 0), DDL=df["DDL"].where( ~df["aid"].isin(["AA", "AG"]), df[["DDL", "DDM"]].max(axis=1) ), DDM=df["DDM"].where(~df["aid"].isin(["AA", "AG"]), 0), ) df = df.assign( NOCDPS=(df[lst_regression_vars].sum(axis=1) == 0).astype(int) ) df = df.assign( **{ idiag: df[diag].where(df["aid"].isin(["DC"]), 0) for idiag, diag in zip( lst_interaction_vars, [diag[1:] for diag in lst_interaction_vars], ) } ) df = df.assign( **{ diag: df[diag].where(df["aid"].isin(["DC", "DA"]), np.nan) for diag in lst_interaction_vars } ) return ( df[ [ col for col in df.columns if col not in ["LST_ADULT_DIAG_CD", "LST_CHILD_DIAG_CD"] ] ], lst_cdps_cat_cols, )
[docs] @classmethod def add_mrx_cat_cols( cls, df: dd.DataFrame, lst_ndc_col_name: str = "LST_NDC" ) -> Tuple[dd.DataFrame, List[str]]: """ Adds MRX category columns Parameters ---------- df : dask.DataFrame lst_ndc_col_name : str, default='LST_NDC' Returns ------- dask.DataFrame list of str List of MRX category column names added. Examples -------- >>> # Requires a dask DataFrame with an NDC code list column >>> df, lst_cols = CdpsRxRiskAdjustment.add_mrx_cat_cols( # doctest: +SKIP ... df, 'LST_NDC') """ # Load var lists and maps data_folder = os.path.join(cls.data_folder, "mrx") pdf_ndc_mrx_cat_map = pd.read_csv( os.path.join(data_folder, "ndc_mrx_cat_map.csv"), dtype=object ) dct_ndc_mrx_cat = pdf_ndc_mrx_cat_map.set_index("NDC").to_dict()[ "MRX_CAT" ] pdf_mrx_cat_hierarchies = pd.read_csv( os.path.join(data_folder, "mrx_cat_hierarchies.csv"), dtype=object ) pdf_mrx_cat_hierarchies[ ["level", "level_order"] ] = pdf_mrx_cat_hierarchies[["level", "level_order"]].astype(int) lst_non_hierarchical_mrx_cat = ( pdf_mrx_cat_hierarchies.loc[pdf_mrx_cat_hierarchies["level"] == 0][ "mrx_cat" ] .unique() .tolist() ) pdf_hierarchical_mrx_cats = ( pdf_mrx_cat_hierarchies.loc[pdf_mrx_cat_hierarchies["level"] != 0][ ["mrx_cat", "level", "level_order"] ] .drop_duplicates() .sort_values(by=["level", "level_order"], ascending=True) ) dct_mrx_hierarchies = ( pdf_hierarchical_mrx_cats.set_index(["level", "level_order"]) .groupby(level=0) .apply(lambda df: df.xs(df.name)["mrx_cat"].to_dict()) .to_dict() ) # Store list of columns in output file lst_mrx_cat_cols = list( set( pdf_mrx_cat_hierarchies["mrx_cat"].unique().tolist() + ["NONE", "OTHER"] ) ) df = df.assign( NONE=( df[lst_ndc_col_name].isna() | (df[lst_ndc_col_name].str.strip() == "") ).astype(int), LST_NDC_MRX=df[lst_ndc_col_name].apply( lambda lst_ndc: list( { dct_ndc_mrx_cat.get( ndc_cd.strip() .upper() .replace(".", "") .zfill(11), "OTHER", ) for ndc_cd in str(lst_ndc).split(",") } ) if pd.notna(lst_ndc) else [] ), ) df = df.map_partitions( lambda pdf: pdf.assign( OTHER=pd.DataFrame( pdf["LST_NDC_MRX"].tolist(), index=pdf.index ) .isin(["OTHER"]) .any(axis=1) .astype(int) ), meta=df._meta.assign(OTHER=0), ) _meta_mrx_nh = df._meta.assign( **{mrx: 0 for mrx in lst_non_hierarchical_mrx_cat} ) df = df.map_partitions( lambda pdf: pdf.assign( **{ mrx: pd.DataFrame( pdf["LST_NDC_MRX"].tolist(), index=pdf.index ) .isin([mrx]) .any(axis=1) .astype(int) for mrx in lst_non_hierarchical_mrx_cat } ), meta=_meta_mrx_nh, ) _meta_mrx_h = df._meta.assign( **{ mrx: 0 for dct_level in dct_mrx_hierarchies.values() for mrx in dct_level.values() } ) df = df.map_partitions( lambda pdf: pdf.assign( **{ mrx: ( pd.DataFrame( pdf["LST_NDC_MRX"].tolist(), index=pdf.index, ) .isin([mrx]) .any(axis=1) & ( (level_order == 1) | ( ~pd.DataFrame( pdf[ "LST_NDC_MRX" ].tolist(), index=pdf.index, ) .isin( [ mrx for level_order, mrx in sorted( dct_level.items() )[ : level_order - 1 ] ] ) .any(axis=1) ) ) ).astype(int) for level, dct_level in dct_mrx_hierarchies.items() for level_order, mrx in sorted( dct_level.items() ) } ), meta=_meta_mrx_h, ) return ( df[[col for col in df.columns if col not in ["LST_NDC_MRX"]]], lst_mrx_cat_cols, )
[docs] @classmethod def combine_cdps_mrx_hierarchies(cls, df: dd.DataFrame) -> dd.DataFrame: """ Apply CDPS MRX hierarchical rollups Parameters ---------- df : dask.DataFrame Returns ------- dask.DataFrame Examples -------- >>> # Requires a dask DataFrame with CDPS and MRX category columns >>> df = CdpsRxRiskAdjustment.combine_cdps_mrx_hierarchies(df) # doctest: +SKIP """ df = df.assign( MRX1=df["MRX1"].where( ~(df[["CARVH", "CARM"]].sum(axis="columns") > 0), 0 ) ) df = df.assign(CARL=df["CARL"].where(~(df["MRX1"] == 1), 0)) df = df.assign(CAREL=df["CAREL"].where(~(df["MRX1"] == 1), 0)) df = df.assign( MRX2=df["MRX2"].where( ~( (df["MRX1"] == 1) | (df[["CARVH", "CARM"]].sum(axis="columns") > 0) | (df[["CARL", "CAREL"]].sum(axis="columns") > 0) ), 0, ) ) df = df.assign( MRX3=df["MRX3"].where( ~( df[["PSYH", "PSYM", "PSYML", "PSYL"]].sum(axis="columns") > 0 ), 0, ) ) df = df.assign( MRX4=df["MRX4"].where( ~( df[["DIA1H", "DIA1M", "DIA2M", "DIA2L"]].sum( axis="columns" ) > 0 ), 0, ) ) df = df.assign( MRX5=df["MRX5"].where( ~(df[["RENEH", "RENVH"]].sum(axis="columns") > 0), 0 ) ) df = df.assign(RENM=df["RENM"].where(~(df["MRX5"] == 1), 0)) df = df.assign(RENL=df["RENL"].where(~(df["MRX5"] == 1), 0)) df = df.assign(MRX6=df["MRX6"].where(~(df["HEMEH"] == 1), 0)) df = df.assign(HEMVH=df["HEMVH"].where(~(df["MRX6"] == 1), 0)) df = df.assign(HEMM=df["HEMM"].where(~(df["MRX6"] == 1), 0)) df = df.assign(HEML=df["HEML"].where(~(df["MRX6"] == 1), 0)) df = df.assign( MRX9=df["MRX9"].where( ~(df[["AIDSH", "INFH"]].sum(axis="columns") > 0), 0 ) ) df = df.assign(HIVM=df["HIVM"].where(~(df["MRX9"] == 1), 0)) df = df.assign( MRX7=df["MRX7"].where( ~( (df[["AIDSH", "INFH"]].sum(axis="columns") > 0) | (df["HIVM"] == 1) ), 0, ) ) df = df.assign( MRX8=df["MRX8"].where( ~( (df[["AIDSH", "INFH"]].sum(axis="columns") > 0) | (df["HIVM"] == 1) ), 0, ) ) df = df.assign( INFM=df["INFM"].where( ~(df[["MRX7", "MRX8", "MRX9"]].sum(axis="columns") > 0), 0 ) ) df = df.assign( INFL=df["INFL"].where( ~(df[["MRX7", "MRX8", "MRX9"]].sum(axis="columns") > 0), 0 ) ) df = df.assign( MRX10=df["MRX10"].where( ~(df[["SKCM", "SKCL", "SKCVL"]].sum(axis="columns") > 0), 0 ) ) df = df.assign( MRX11=df["MRX11"].where( ~(df[["CANVH", "CANH", "CANM"]].sum(axis="columns") > 0), 0 ) ) df = df.assign(CANL=df["CANL"].where(~(df["MRX11"] == 1), 0)) df = df.assign( MRX12=df["MRX12"].where( ~(df[["CNSH", "CNSM"]].sum(axis="columns") > 0), 0 ) ) df = df.assign(CNSL=df["CNSL"].where(~(df["MRX12"] == 1), 0)) df = df.assign( MRX14=df["MRX14"].where( ~( (df[["CNSH", "CNSM"]].sum(axis="columns") > 0) | (df["MRX12"] == 1) | (df["CNSL"] == 1) ), 0, ) ) df = df.assign( MRX13=df["MRX13"].where( ~( (df[["CNSH", "CNSM"]].sum(axis="columns") > 0) | (df["MRX12"] == 1) | (df["CNSL"] == 1) | (df["MRX14"] == 1) ), 0, ) ) df = df.assign( MRX15=df["MRX15"].where( ~( df[["PULVH", "PULH", "PULM", "PULL"]].sum(axis="columns") > 0 ), 0, ) ) df = df.assign( CCARM=df["CCARM"].where( ~((df["MRX1"] == 1) & (df["aid"] == "DC")), 1 ) ) df = df.assign( CHEMEH=df["CHEMEH"].where( ~((df["MRX6"] == 1) & (df["aid"] == "DC")), 1 ) ) df = df.assign( CHIVM=df["CHIVM"].where( ~((df["MRX9"] == 1) & (df["aid"] == "DC")), 0 ) ) df = df.assign( CINFM=df["CINFM"].where( ~((df["MRX9"] == 1) & (df["aid"] == "DC")), 0 ) ) df = df.assign( CHIVM=df["CHIVM"].where( ~( (df[["MRX7", "MRX8"]].sum(axis="columns") > 0) & (df["aid"] == "DC") ), 1, ) ) df = df.assign( CINFM=df["CINFM"].where( ~( (df[["MRX7", "MRX8"]].sum(axis="columns") > 0) & (df["aid"] == "DC") ), 0, ) ) return df
[docs] @classmethod def calculate_risk( cls, df: dd.DataFrame, lst_cov: List[str], score_col_name: str = "risk" ) -> dd.DataFrame: """ Calculates CDPS risk adjustment score. The returned dataframe has the following new columns: * risk - CDPS risk score, dtype: float64 Parameters ---------- df : dask.DataFrame lst_cov : list of str List of covariates that CDPS MRX model uses score_col_name : str, default='risk' Returns ------- dask.DataFrame Examples -------- >>> # Requires a dask DataFrame with covariate columns and aid column >>> df = CdpsRxRiskAdjustment.calculate_risk( # doctest: +SKIP ... df, lst_cov=['a_under1', 'a_1_4'], score_col_name='risk') """ dct_weights = ( pd.read_excel( os.path.join(cls.data_folder, "cdps_mrx_con_weighting.xlsx"), sheet_name="CDPSMRX2", ) .fillna(0) .to_dict("records")[0] ) df[score_col_name] = np.nan np_aa_coef = np.array( [dct_weights.get("AA_" + col, 0) for col in lst_cov] ) np_ac_coef = np.array( [dct_weights.get("AC_" + col, 0) for col in lst_cov] ) np_dadc_coef = np.array( [dct_weights.get("DADC_" + col, 0) for col in lst_cov] ) df = df.assign( **dict( [ ( score_col_name, df[score_col_name].where( ~(df["aid"] == "AA"), dct_weights["AA_Intercept"] + (df[list(lst_cov)] * np_aa_coef).sum( axis=1 ), ), ) ] ) ) df = df.assign( **dict( [ ( score_col_name, df[score_col_name].where( ~(df["aid"] == "AC"), dct_weights["AC_Intercept"] + (df[list(lst_cov)] * np_ac_coef).sum( axis=1 ), ), ) ] ) ) df = df.assign( **dict( [ ( score_col_name, df[score_col_name].where( ~((df["aid"] == "DC") | (df["aid"] == "DA")), dct_weights["DADC_Intercept"] + ( df[list(lst_cov)] * np_dadc_coef ).sum(axis=1), ), ) ] ) ) return df[[col for col in df.columns if col not in lst_cov]]
[docs] def cdps_rx_risk_adjust( df: dd.DataFrame, lst_diag_col_name: str = "LST_DIAG_CD", lst_ndc_col_name: str = "LST_NDC", score_col_name: str = "risk", ) -> dd.DataFrame: """ Calculate CDPS MRX risk adjustment score. This function expects the input dataframe to be aggregated to patient level, with columns containing comma separated lists of diagnosis codes & NDC codes during the observed time period. Parameters ---------- df : dask.DataFrame BENE level Dataframe with diagnosis code & ndc_code list columns lst_diag_col_name : str, default='LST_DIAG_CD' Name of diagnosis code list column. lst_ndc_col_name : str, default='LST_NDC' Name of NDC code list column score_col_name : str, default='risk' CDPS MRX score output column name Returns ------- dask.DataFrame Examples -------- >>> # Requires a patient-level dask DataFrame with diagnosis and NDC columns >>> df = cdps_rx_risk_adjust(df, 'LST_DIAG_CD', 'LST_NDC') # doctest: +SKIP """ df, lst_age_and_gender_cols = CdpsRxRiskAdjustment.add_age_and_gender_cols( df ) df, lst_cdps_category_cols = CdpsRxRiskAdjustment.add_cdps_category_cols( df, lst_diag_col_name ) df, lst_mrx_category_cols = CdpsRxRiskAdjustment.add_mrx_cat_cols( df, lst_ndc_col_name ) df = CdpsRxRiskAdjustment.combine_cdps_mrx_hierarchies(df) lst_cov = ( lst_age_and_gender_cols + lst_cdps_category_cols + lst_mrx_category_cols ) return CdpsRxRiskAdjustment.calculate_risk(df, lst_cov, score_col_name)