Source code for medicaid_utils.adapted_algorithms.py_elixhauser.elixhauser_comorbidity
"""This a python package for computing Elixhauser comorbidity score"""
import os
import pandas as pd
import dask.dataframe as dd
[docs]
class ElixhauserScoring:
package_folder, filename = os.path.split(__file__)
data_folder = os.path.join(package_folder, "data")
[docs]
@classmethod
def flag_comorbidities(
cls, df: dd.DataFrame, lst_diag_col_name: str, cms_format: str = "MAX"
) -> dd.DataFrame:
"""
Flag Elixhauser comorbidity groups based on diagnosis codes.
Adds 31 binary columns (ELX_GRP_1 through ELX_GRP_31) indicating
the presence of each Elixhauser comorbidity group.
Parameters
----------
df : dask.DataFrame
Bene-level DataFrame with a diagnosis code list column.
lst_diag_col_name : str
Column name containing comma-separated diagnosis codes.
cms_format : {'MAX', 'TAF'}, default='MAX'
CMS file format, determines ICD-9 vs ICD-10 mapping.
Returns
-------
dask.DataFrame
DataFrame with ELX_GRP columns appended.
Examples
--------
>>> # Requires a dask DataFrame with diagnosis codes
>>> df = ElixhauserScoring.flag_comorbidities( # doctest: +SKIP
... df, 'LST_DIAG_CD', cms_format='MAX')
"""
df_icd_mapping = pd.read_csv(
os.path.join(
cls.data_folder,
f"icd{9 if cms_format == 'MAX' else 10}_mapping.csv",
)
)
df_icd_mapping = df_icd_mapping.assign(
ICD=df_icd_mapping[
f"ICD{9 if cms_format == 'MAX' else 10}"
].str.split(",")
)
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
"ELX_GRP_" + str(i): pdf[lst_diag_col_name]
.str.split(",", expand=True)
.apply(
lambda x: x.str.replace(".", "")
.str.strip()
.str.upper()
.str.startswith(
tuple(
df_icd_mapping.loc[
df_icd_mapping["ELX_GRP"] == i, # pylint: disable=cell-var-from-loop
"ICD",
].values[0]
)
)
)
.any(axis="columns")
.astype(int)
for i in range(1, 32)
}
)
)
return df
[docs]
@classmethod
def calculate_final_score(
cls, df: dd.DataFrame, output_column_name: str = "elixhauser_score"
) -> dd.DataFrame:
"""
Calculate the final Elixhauser comorbidity score.
Sums the 31 ELX_GRP binary columns into a single integer score.
Parameters
----------
df : dask.DataFrame
DataFrame with ELX_GRP_1 through ELX_GRP_31 columns.
output_column_name : str, default='elixhauser_score'
Name for the output score column.
Returns
-------
dask.DataFrame
DataFrame with the score column appended.
Examples
--------
>>> # Requires a dask DataFrame with ELX_GRP columns
>>> df = ElixhauserScoring.calculate_final_score(df) # doctest: +SKIP
"""
df[output_column_name] = (
df[["ELX_GRP_" + str(i) for i in range(1, 32)]]
.sum(axis=1)
.fillna(0)
.astype(int)
)
return df
[docs]
def score(
df: dd.DataFrame,
lst_diag_col_name: str,
cms_format: str = "MAX",
output_column_name: str = "elixhauser_score",
) -> dd.DataFrame:
"""
Computes Elixhauser score for the benes in the input dataframe. The input dataframe should be at bene level, with
a column containing each bene’s comma separated list of diagnosis codes from the observed period.
Parameters
----------
df : dask.DataFrame
Bene level dataframe
lst_diag_col_name : str
Column name containing the list of diagnosis codes
cms_format : {'MAX', 'TAF'}
CMS file format.
output_column_name : str, default='elixhauser_score'
Output column name. Defaults to elixhauser score
Returns
-------
dask.DataFrame
Examples
--------
>>> # Requires a bene-level dask DataFrame with diagnosis codes
>>> df = score(df, 'LST_DIAG_CD', cms_format='MAX') # doctest: +SKIP
"""
df = ElixhauserScoring.flag_comorbidities(
df, lst_diag_col_name, cms_format=cms_format
)
df = ElixhauserScoring.calculate_final_score(df, output_column_name)
return df