#!/usr/bin/env python
"""This program creates claim level Potentially Preventable ED visit indicators (Sheryl Davies, Ellen Schultz
et al 2017)."""
__author__ = "Manoradhan Murugesan"
__email__ = "manorathan@uchicago.edu"
import os
from itertools import product
from typing import List
import pandas as pd
import numpy as np
import dask.dataframe as dd
[docs]
def fix_index(
df: dd.DataFrame, index_name: str, drop_column: bool = True
) -> dd.DataFrame:
"""
Set or fix the index of a dask DataFrame.
Ensures the DataFrame is indexed by the specified column. If the
DataFrame already has the correct index with known divisions, it
optionally drops or keeps the index column.
Parameters
----------
df : dask.DataFrame
Input dask DataFrame.
index_name : str
Name of the column to use as the index.
drop_column : bool, default=True
Whether to drop the index column from the DataFrame columns.
Returns
-------
dask.DataFrame
DataFrame with the specified index set.
Examples
--------
>>> # Requires a dask DataFrame with an 'MSIS_ID' column
>>> df = fix_index(df, 'MSIS_ID') # doctest: +SKIP
"""
if (df.index.name != index_name) | (df.divisions[0] is None):
if index_name not in df.columns:
if df.index.name != index_name:
raise ValueError(f"{index_name} not in dataframe")
df[index_name] = df.index
df = df.set_index(index_name, drop=drop_column)
elif not drop_column:
df[index_name] = df.index
else:
df = df[[col for col in df.columns if col != index_name]]
return df
[docs]
class EDPreventionQualityIndicators:
package_folder, filename = os.path.split(__file__)
data_folder = os.path.join(package_folder, "data")
[docs]
@classmethod
def get_patient_exclusion_indicators(
cls, df_ip: dd.DataFrame, df_ot: dd.DataFrame, df_ps: dd.DataFrame
) -> dd.DataFrame:
"""
Generate patient-level exclusion indicators for ED PQI measures.
Flags disease-related diagnoses across IP and OT claims, aggregates
them to the patient level, and merges with the PS (personal summary)
file.
Parameters
----------
df_ip : dask.DataFrame
Inpatient claims DataFrame.
df_ot : dask.DataFrame
Outpatient claims DataFrame.
df_ps : dask.DataFrame
Personal summary DataFrame with demographic columns.
Returns
-------
dask.DataFrame
Patient-level DataFrame with exclusion indicator columns.
Examples
--------
>>> # Requires IP, OT, and PS dask DataFrames with diagnosis columns
>>> df_ps = EDPreventionQualityIndicators.get_patient_exclusion_indicators( # doctest: +SKIP
... df_ip, df_ot, df_ps)
"""
pdf_icd9_diseases = pd.read_excel(
os.path.join(cls.data_folder, "icd9_codes.xlsx"),
sheet_name="disease_diagnoses",
)
pdf_icd9_edvsts = pd.read_excel(
os.path.join(cls.data_folder, "icd9_codes.xlsx"),
sheet_name="edvst_types",
)
dct_conditions = (
pdf_icd9_diseases.groupby("condition")["include_icd9"]
.apply(lambda x: tuple(str(cd) for cd in x))
.to_dict()
)
dct_edvst_cat = (
pdf_icd9_edvsts.groupby("condition")["include_icd9"]
.apply(lambda x: tuple(str(cd) for cd in x))
.to_dict()
)
def flag_disease_diagnoses(
df: dd.DataFrame, dct_conditions: dict, dct_edvst_cat: dict
) -> dd.DataFrame:
df = df.map_partitions(
lambda pdf: pdf.assign(
**dict(
[
(
"diag_" + condn,
pdf[
pdf.columns[
pd.Series(pdf.columns).str.startswith(
"DIAG_CD"
)
].tolist()
]
.apply(
lambda x: x.str.upper().str.startswith(
dct_conditions[condn] # pylint: disable=cell-var-from-loop
)
)
.any(axis="columns")
.astype(int),
)
for condn in [
"facial_trauma",
"diabetes",
"immunocompromised",
"cystic_fibrosis",
"pneumonia",
"fever",
"uti",
"spinal",
"trauma",
"cancer",
]
]
+ [
(
"diag_uti_with_ut_malformation",
(
pdf["DIAG_CD_1"]
.str.upper()
.str.startswith(dct_conditions["uti"])
& pdf[
pdf.columns[
pd.Series(
pdf.columns
).str.startswith("DIAG_CD")
].tolist()
]
.apply(
lambda x: x.str.upper().str.startswith(
dct_conditions["ut_malformation"]
)
)
.any(axis="columns")
).astype(int),
)
]
+ [
(
"diag_cellulitis_with_diabetes",
(
pdf["DIAG_CD_1"]
.str.upper()
.str.startswith(
dct_edvst_cat["acute_acsc_cellulitis"]
)
& pdf[
pdf.columns[
pd.Series(
pdf.columns
).str.startswith("DIAG_CD")
].tolist()
]
.apply(
lambda x: x.str.upper().str.startswith(
dct_conditions["diabetes"]
)
)
.any(axis="columns")
).astype(int),
)
]
)
)
)
return df
lst_conditions = [
"facial_trauma",
"diabetes",
"immunocompromised",
"cystic_fibrosis",
"pneumonia",
"fever",
"uti",
"spinal",
"trauma",
"cancer",
"uti_with_ut_malformation",
"cellulitis_with_diabetes",
]
def agg_conditions_to_patient_level(
pdf_claims: pd.DataFrame, lst_conditions: List[str]
) -> pd.DataFrame:
return pdf_claims.groupby("MSIS_ID").agg(
**{
"diag_" + condn: ("diag_" + condn, "max")
for condn in lst_conditions
}
)
df_ip_with_indicators = flag_disease_diagnoses(
df_ip, dct_conditions, dct_edvst_cat
)
fix_index(df_ip_with_indicators, index_name="MSIS_ID")
df_ip_with_indicators = df_ip_with_indicators.map_partitions(
lambda pdf: agg_conditions_to_patient_level(pdf, lst_conditions)
).compute()
if df_ip_with_indicators.index.name != "MSIS_ID":
df_ip_with_indicators = df_ip_with_indicators.set_index("MSIS_ID")
df_ip_with_indicators["MSIS_ID"] = df_ip_with_indicators.index
df_ot_with_indicators = flag_disease_diagnoses(
df_ot, dct_conditions, dct_edvst_cat
)
fix_index(df_ot_with_indicators, index_name="MSIS_ID")
df_ot_with_indicators = df_ot_with_indicators.map_partitions(
lambda pdf: agg_conditions_to_patient_level(pdf, lst_conditions)
).compute()
if df_ot_with_indicators.index.name != "MSIS_ID":
df_ot_with_indicators = df_ot_with_indicators.set_index("MSIS_ID")
df_ot_with_indicators["MSIS_ID"] = df_ot_with_indicators.index
pdf_ps = pd.concat(
[df_ip_with_indicators, df_ot_with_indicators], ignore_index=True
)
pdf_ps = agg_conditions_to_patient_level(pdf_ps, lst_conditions)
if pdf_ps.index.name != "MSIS_ID":
pdf_ps = pdf_ps.set_index("MSIS_ID")
pdf_ps = pdf_ps[[col for col in pdf_ps.columns if col != "MSIS_ID"]]
df_ps = df_ps[["Female", "age", "ageday"]].merge(
pdf_ps, left_index=True, right_index=True, how="left"
)
df_ps = df_ps.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col].fillna(0)
for col in pdf.columns
if col not in ["Female", "age", "ageday", "MSIS_ID"]
}
)
)
fix_index(df_ps, "MSIS_ID")
return df_ps
[docs]
@classmethod
def flag_potentially_preventable_ed_visits(
cls, df_ed: dd.DataFrame, df_ps: dd.DataFrame,
months_restricted: bool = False,
) -> dd.DataFrame:
"""
Flag potentially preventable ED visits using PQI criteria.
Categorizes ED visits by condition type (acute ACSC, chronic ACSC,
dental, asthma, back pain), applies age and diagnosis-based
exclusions, and aggregates counts and costs at the patient level.
Parameters
----------
df_ed : dask.DataFrame
ED claims DataFrame with diagnosis and payment columns.
df_ps : dask.DataFrame
Patient-level DataFrame with exclusion indicators.
months_restricted : bool, default=False
Whether to restrict to valid enrollment months.
Returns
-------
dask.DataFrame
Patient-level DataFrame with PQI ED visit count and cost columns.
Examples
--------
>>> # Requires ED and PS dask DataFrames with required columns
>>> df_ed = EDPreventionQualityIndicators.flag_potentially_preventable_ed_visits( # doctest: +SKIP
... df_ed, df_ps, months_restricted=False)
"""
pdf_icd9_diseases = pd.read_excel(
os.path.join(cls.data_folder, "icd9_codes.xlsx"),
sheet_name="disease_diagnoses",
)
pdf_icd9_edvsts = pd.read_excel(
os.path.join(cls.data_folder, "icd9_codes.xlsx"),
sheet_name="edvst_types",
)
dct_conditions = (
pdf_icd9_diseases.groupby("condition")["include_icd9"]
.apply(lambda x: tuple(str(cd) for cd in x))
.to_dict()
)
dct_edvst_cat = (
pdf_icd9_edvsts.groupby("condition")["include_icd9"]
.apply(lambda x: tuple(str(cd) for cd in x))
.to_dict()
)
if months_restricted is False:
df_ed["valid_month"] = 1
lst_claimtype = [""]
if months_restricted:
lst_claimtype.append("valid_")
fix_index(df_ed, "MSIS_ID")
def mptn_categorise_ed_visits(
pdf: pd.DataFrame, dct_edvst_cat: dict, _dct_conditions: dict
) -> pd.DataFrame:
# Adding indicator and cost columns for each of PQI ED covered disease conditions
pdf = pdf.assign(
**dict(
[
(
"edvst_" + condn,
pdf["DIAG_CD_1"]
.str.startswith(dct_edvst_cat[condn])
.astype(int),
)
for condn in dct_edvst_cat.keys()
]
+ [
(
"edcost_" + condn,
pdf["pymt_amt"].where(
pdf["DIAG_CD_1"].str.startswith(
dct_edvst_cat[condn]
),
0,
),
)
for condn in dct_edvst_cat.keys()
]
)
)
# Lower respiratory infection related ED visits: first-listed diagnosis for Lower respiratory infection
# and 2) a second-listed diagnosis of COPD or asthma.
pdf = pdf.assign(
**dict(
[
(
"edvst_" + condn,
(
pdf["DIAG_CD_1"].str.startswith(
dct_edvst_cat[condn]
)
& pdf[
pdf.columns[
pd.Series(pdf.columns).str.startswith(
"DIAG_CD"
)
].tolist()
]
.apply(
lambda x: x.str.upper().str.startswith(
tuple(
list(
dct_edvst_cat[
"chronic_acsc_asthma"
]
)
+ list(
dct_edvst_cat[
"chronic_acsc_copd"
]
)
)
)
)
.any(axis="columns")
).astype(int),
)
for condn in ["chronic_acsc_lresp"]
]
+ [
(
"edcost_" + condn,
pdf["pymt_amt"].where(
(
pdf["DIAG_CD_1"].str.startswith(
dct_edvst_cat[condn]
)
& pdf[
pdf.columns[
pd.Series(
pdf.columns
).str.startswith("DIAG_CD")
].tolist()
]
.apply(
lambda x: x.str.upper().str.startswith(
tuple(
list(
dct_edvst_cat[
"chronic_acsc_asthma"
]
)
+ list(
dct_edvst_cat[
"chronic_acsc_copd"
]
)
)
)
)
.any(axis="columns")
),
0,
),
)
for condn in ["chronic_acsc_lresp"]
]
)
)
# Subsetting claims that meet months restriction
pdf_months_restricted = pdf.loc[pdf["valid_month"] == 1]
# Aggregating claims to day level
pdf = (
pdf.groupby(["MSIS_ID", "DATE"])
.agg(
**dict(
[
("edvst_" + condn, ("edvst_" + condn, "max"))
for condn in dct_edvst_cat.keys()
]
+ [
("edcost_" + condn, ("edcost_" + condn, "sum"))
for condn in dct_edvst_cat.keys()
]
)
)
.reset_index(drop=False)
.set_index("MSIS_ID")
)
# Aggregating claims to patient level
pdf = pdf.groupby("MSIS_ID").agg(
**dict(
[
(
"edvst_" + condn + "_count",
("edvst_" + condn, "sum"),
)
for condn in dct_edvst_cat.keys()
]
+ [
(
"edvst_" + condn + "_cost",
("edcost_" + condn, "sum"),
)
for condn in dct_edvst_cat.keys()
]
)
)
pdf_months_restricted = (
pdf_months_restricted.groupby(["MSIS_ID", "DATE"])
.agg(
**dict(
[
("valid_edvst_" + condn, ("edvst_" + condn, "max"))
for condn in dct_edvst_cat.keys()
]
+ [
(
"valid_edcost_" + condn,
("edcost_" + condn, "sum"),
)
for condn in dct_edvst_cat.keys()
]
)
)
.reset_index(drop=False)
.set_index("MSIS_ID")
)
pdf_months_restricted = pdf_months_restricted.groupby(
"MSIS_ID"
).agg(
**dict(
[
(
"valid_edvst_" + condn + "_count",
("valid_edvst_" + condn, "sum"),
)
for condn in dct_edvst_cat.keys()
]
+ [
(
"valid_edvst_" + condn + "_cost",
("valid_edcost_" + condn, "sum"),
)
for condn in dct_edvst_cat.keys()
]
)
)
pdf_combined = pdf.merge(
pdf_months_restricted,
left_index=True,
right_index=True,
how="left",
)
if pdf_combined.index.name != "MSIS_ID":
pdf_combined = pdf_combined.set_index("MSIS_ID")
return pdf_combined
df_ed = df_ed.map_partitions(
lambda pdf: mptn_categorise_ed_visits(
pdf, dct_edvst_cat, dct_conditions
)
)
fix_index(df_ed, "MSIS_ID", drop_column=False)
df_ed = df_ed.compute()
df_ed = df_ps.merge(
df_ed, left_index=True, right_index=True, how="inner"
)
df_ed = df_ed.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col].fillna(0)
for col in pdf.columns
if (
col.startswith("edvst")
| col.startswith("valid_edvst")
)
}
)
)
df_ed = df_ed.map_partitions(
lambda pdf: pdf.assign(
**dict(
[
(
claim_type + "pqi_ed_dental_" + agg_type,
pdf[claim_type + "edvst_dental_" + agg_type].where(
(pdf["age"] >= 5)
& (pdf["diag_facial_trauma"] == 0),
np.nan,
),
)
for claim_type, agg_type in product(
lst_claimtype, ["count", "cost"]
)
]
+ [
(
claim_type + "pqi_ed_" + condn + "_" + agg_type,
pdf[
claim_type + "edvst_" + condn + "_" + agg_type
].where((pdf["age"] >= 40), np.nan),
)
for claim_type, agg_type, condn in product(
lst_claimtype,
["count", "cost"],
[
condn_abbr
for condn_abbr in dct_edvst_cat.keys()
if condn_abbr.startswith("chronic_acsc")
],
)
]
+ [
(
claim_type + "pqi_ed_" + condn + "_" + agg_type,
pdf[
claim_type + "edvst_" + condn + "_" + agg_type
].where(
(pdf["ageday"] >= 90)
& (pdf["age"] < 65)
& (pdf["diag_immunocompromised"] == 0)
& (
~(
(pdf["Female"] == 1)
& (
pdf["age"].between(
18, 34, inclusive="both"
)
)
& (
pdf[
"diag_uti_with_ut_malformation"
]
== 1
)
)
)
& (pdf["diag_cellulitis_with_diabetes"] == 0),
np.nan,
),
)
for claim_type, agg_type, condn in product(
lst_claimtype,
["count", "cost"],
[
condn_abbr
for condn_abbr in dct_edvst_cat.keys()
if condn_abbr.startswith("acute_acsc")
],
)
]
+ [
(
claim_type + "pqi_ed_asthma_" + agg_type,
pdf[
claim_type + "edvst_" + condn + "_" + agg_type
].where(
pdf["age"].between(5, 39, inclusive="both")
& (pdf["diag_cystic_fibrosis"] == 0)
& (pdf["diag_pneumonia"] == 0),
np.nan,
),
)
for claim_type, agg_type, condn in product(
lst_claimtype,
["count", "cost"],
["chronic_acsc_asthma"],
)
]
+ [
(
claim_type + "pqi_ed_back_pain_" + agg_type,
pdf[
claim_type + "edvst_" + condn + "_" + agg_type
].where(
(pdf["age"] >= 18)
& (pdf["diag_trauma"] == 0)
& (pdf["diag_uti"] == 0)
& (pdf["diag_fever"] == 0)
& (pdf["diag_cancer"] == 0)
& (pdf["diag_spinal"] == 0),
np.nan,
),
)
for claim_type, agg_type, condn in product(
lst_claimtype, ["count", "cost"], ["back_pain"]
)
]
)
)
)
df_ed = df_ed.map_partitions(
lambda pdf: pdf.assign(
**{
claim_type
+ "pqi_ed_"
+ condn_cat
+ "_acsc_"
+ agg_type: pdf[
[
claim_type
+ "pqi_ed_"
+ condn
+ "_"
+ agg_type
for condn in dct_edvst_cat.keys()
if condn.startswith(condn_cat + "_acsc")
]
].sum(axis="columns", min_count=1)
for claim_type, agg_type, condn_cat in product(
lst_claimtype,
["count", "cost"],
["chronic", "acute"],
)
}
)
)
fix_index(df_ed, "MSIS_ID", drop_column=True)
df_ed = df_ed[
["MSIS_ID"]
+ [
claim_type + "pqi_ed_" + condn + "_" + agg_type
for claim_type, condn, agg_type in product(
lst_claimtype,
[
"dental",
"chronic_acsc",
"acute_acsc",
"asthma",
"back_pain",
],
["count", "cost"],
)
]
]
return df_ed
[docs]
def get_ed_pqis(
df_ip: dd.DataFrame, df_ot: dd.DataFrame, df_ps: dd.DataFrame,
df_ed: dd.DataFrame, restrict_months: bool = False,
) -> dd.DataFrame:
"""
Compute ED Prevention Quality Indicators for a given set of claims.
This is the main entry point that creates patient-level exclusion
indicators from IP and OT claims, then flags potentially preventable
ED visits.
Parameters
----------
df_ip : dask.DataFrame
Inpatient claims DataFrame.
df_ot : dask.DataFrame
Outpatient claims DataFrame.
df_ps : dask.DataFrame
Personal summary DataFrame.
df_ed : dask.DataFrame
ED claims DataFrame.
restrict_months : bool, default=False
Whether to restrict to valid enrollment months.
Returns
-------
dask.DataFrame
Patient-level DataFrame with ED PQI measures.
Examples
--------
>>> # Requires IP, OT, PS, and ED dask DataFrames
>>> df_ed_pqi = get_ed_pqis(df_ip, df_ot, df_ps, df_ed) # doctest: +SKIP
"""
df_ps = EDPreventionQualityIndicators.get_patient_exclusion_indicators(
df_ip, df_ot, df_ps
)
df_ed = (
EDPreventionQualityIndicators.flag_potentially_preventable_ed_visits(
df_ed, df_ps, restrict_months
)
)
return df_ed