"""This module has TAFPS class which wraps together cleaning/ preprocessing
routines specific for TAF PS files"""
import os
from typing import Optional
from itertools import product
import numpy as np
import pandas as pd
import dask.dataframe as dd
from medicaid_utils.preprocessing import taf_file
from medicaid_utils.common_utils import dataframe_utils
from medicaid_utils.adapted_algorithms.py_elixhauser import (
elixhauser_comorbidity,
)
data_folder = os.path.join(os.path.dirname(__file__), "data")
other_data_folder = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "other_datasets", "data"
)
[docs]
class TAFPS(taf_file.TAFFile):
"""Scripts to preprocess PS file"""
def __init__(
self,
year: int,
state: str,
data_root: str,
index_col: str = "BENE_MSIS",
clean: bool = True,
preprocess: bool = True,
rural_method: str = "ruca",
tmp_folder: Optional[str] = None,
pq_engine: str = "pyarrow",
) -> None:
"""
Parameters
----------
year : int
Claim year
state : str
Claim state
data_root : str
Root folder with cms data
index_col : str, default='BENE_MSIS'
Column to use as index. Eg. BENE_MSIS or MSIS_ID. The raw file
is expected to be already sorted with index column
clean : bool, default=True
Run cleaning routines if True
preprocess : bool, default=True
Add commonly used constructed variable columns if True
rural_method : {'ruca', 'rucc'}
Method to use for rural variable construction. Available
options: 'ruca', 'rucc'
tmp_folder : str, default=None
Folder to use to store temporary files
pq_engine: str, default='pyarrow'
Parquet Engine
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms') # doctest: +SKIP
"""
super().__init__(
"ps",
year=year,
state=state,
data_root=data_root,
index_col=index_col,
clean=False,
preprocess=False,
tmp_folder=tmp_folder,
pq_engine=pq_engine,
)
# Default filters to filter out benes that do not meet minimum
# standard of cleanliness criteria duplicated_bene_id exclusion will
# remove benes with duplicated BENE_MSIS ids
self.dct_default_filters = {"duplicated_bene_id": 0}
if clean:
self.clean()
if preprocess:
self.preprocess(rural_method)
[docs]
def clean(self) -> None:
"""Runs cleaning routines and creates common exclusion flags based
on default filters.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP
>>> ps.clean() # doctest: +SKIP
"""
super().clean()
self.add_gender()
self.flag_common_exclusions()
[docs]
def preprocess(
self, rural_method: str = "ruca", add_risk_adjustment_scores: bool = False
) -> None:
"""Adds rural and eligibility criteria indicator variables.
Parameters
----------
rural_method : str, default='ruca'
Method to use for rural classification. Options: 'ruca', 'rucc'.
add_risk_adjustment_scores : bool, default=False
Whether to add Elixhauser risk adjustment scores.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', preprocess=False) # doctest: +SKIP
>>> ps.preprocess(rural_method='ruca') # doctest: +SKIP
"""
self.flag_rural(rural_method)
self.flag_dual()
self.flag_restricted_benefits()
self.compute_enrollment_gaps()
self.add_mas_boe()
self.flag_tanf()
self.flag_medicaid_enrolled_months()
self.flag_managed_care_months()
self.flag_ffs_months()
if add_risk_adjustment_scores:
self.add_risk_adjustment_scores()
[docs]
def flag_common_exclusions(self) -> None:
"""
Adds commonly used exclusion flags
New Column(s):
- excl_duplicated_bene_id - 0 or 1, 1 when bene's index column is
repeated
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False) # doctest: +SKIP
>>> ps.flag_common_exclusions() # doctest: +SKIP
"""
df_base = self.dct_files["base"]
df_base = df_base.assign(**{f"_{self.index_col}": df_base.index})
# Some BENE_MSIS's are repeated in PS files. Some patients share the
# same BENE_ID and yet have different MSIS_IDs. Some of them even
# have different 'dates of birth'. Since we cannot find any
# explanation for such patterns, we decided on removing these
# BENE_MSIS's as per issue #29 in FARA project
# (https://rcg.bsd.uchicago.edu/gitlab/mmurugesan/hrsa_max_feature_extraction/issues/29)
df_base = df_base.map_partitions(
lambda pdf: pdf.assign(
excl_missing_dob=pdf["birth_date"].isnull().astype(int),
excl_duplicated_bene_id=pdf.duplicated(
[f"_{self.index_col}"], keep=False
).astype(int),
)
)
df_base = df_base.drop([f"_{self.index_col}"], axis=1)
self.dct_files["base"] = df_base
self.cache_results("base")
[docs]
def add_mas_boe(self) -> None:
"""
Adds columns denoting number of months in each Maintenance
Assistance Status (MAS) and Basis of Eligibility
(BOE) category. Columns added are,
- boe_chip_months : Number of months in Separate-CHIP BOE category
- boe_aged_months : Number of months in Aged BOE category
- boe_blind_disabled_months : Number of months in Blind/Disabled
BOE category
- boe_child_months : Number of months in Children BOE category
- boe_adults_months : Number of months in Adult BOE category
- boe_breast_and_cervical_cancer_months : Number of months in
Breast and Cervical Cancer Prevention and Treatment Act of
2000 BOE category
- boe_child_of_unemployed_months : Number of months in Child of
Unemployed Adult BOE category
- boe_unemployed_months : Number of months in Unemployed Adult
BOE category
- boe_foster_care_children_months : Number of months in Foster
Care Children BOE category
- boe_unknown_months : Number of months in Uknown BOE category
- mas_chip_months : Number of months in Separate-CHIP MAS category
- mas_cash_sec_1931_months : Number of months in Individuals
receiving cash assistance or eligible under section 1931 of
the Act MAS category
- mas_medically_needy_months : Number of months in Medically
Needy MAS category
- mas_poverty_months : Number of months in Poverty Related
Eligibles MAS category
- mas_other_months : Number of months in Other Eligibles MAS
category
- mas_demonstration_months : Number of months in Section 1115
Demonstration expansion eligible MAS category
- mas_unknown_months : Number of months in Unknown MAS category
- max_mas_type : Top MAS category for the bene
- max_boe_type : Top BOE category for the bene
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.add_mas_boe() # doctest: +SKIP
"""
df = self.dct_files["base"]
dct_boe_codes = {
"chip": 0,
"aged": 1,
"blind_disabled": 2,
"child": 4,
"adults": 5,
"breast_and_cervical_cancer": 11,
"child_of_unemployed": 6,
"unemployed": 7,
"foster_care_children": 8,
"unknown": 99,
}
dct_mas_codes = {
"chip": 0,
"cash_sec_1931": 1,
"medically_needy": 2,
"poverty": 3,
"other": 4,
"demonstration": 5,
"unknown": 9,
}
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
**{
f"boe_{boe_type}_months": np.column_stack(
[
(
pd.to_numeric(pdf[col], errors="coerce")
.fillna(999)
.astype(int)
.astype(str)
.str.zfill(3)
.str[1:3]
.astype(int)
== boe_code
).astype(int)
for col in [
f"MASBOE_CD_{str(mon).zfill(2)}"
for mon in range(1, 13)
]
]
)
.sum(axis=1)
.astype(int)
for boe_type, boe_code in dct_boe_codes.items()
},
**{
f"mas_{mas_type}_months": np.column_stack(
[
(
pd.to_numeric(pdf[col], errors="coerce")
.fillna(999)
.astype(int)
.astype(str)
.str.zfill(3)
.str[0]
.astype(int)
== mas_code
).astype(int)
for col in [
f"MASBOE_CD_{str(mon).zfill(2)}"
for mon in range(1, 13)
]
]
)
.sum(axis=1)
.astype(int)
for mas_type, mas_code in dct_mas_codes.items()
},
}
)
)
df = df.assign(
**{
"max_mas_type": df[
[f"mas_{mas_type}_months" for mas_type in dct_mas_codes]
]
.idxmax(axis=1)
.str[4:]
.str[:-7]
}
)
df = df.assign(
**{
"max_boe_type": df[
[f"boe_{boe_type}_months" for boe_type in dct_boe_codes]
]
.idxmax(axis=1)
.str[4:]
.str[:-7],
"boe_gt_6_mon": (
df[
[
f"boe_{boe_type}_months"
for boe_type in dct_boe_codes
]
]
> 6
)
.astype(int)
.idxmax(axis=1)
.str[4:]
.str[:-7]
.where(
(
df[
[
f"boe_{boe_type}_months"
for boe_type in dct_boe_codes
]
]
> 6
)
.astype(int)
.any(axis=1),
np.nan,
),
"mas_gt_6_mon": (
df[
[
f"mas_{mas_type}_months"
for mas_type in dct_mas_codes
]
]
> 6
)
.astype(int)
.idxmax(axis=1)
.str[4:]
.str[:-7]
.where(
(
df[
[
f"mas_{mas_type}_months"
for mas_type in dct_mas_codes
]
]
> 6
)
.astype(int)
.any(axis=1),
np.nan,
),
}
)
self.dct_files["base"] = df
self.cache_results("base")
[docs]
def add_gender(self) -> None:
"""Adds integer 'female' column based on 'SEX_CD' column. Undefined
values ('U') in SEX_CD column will result in female column taking
the value -1.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.add_gender() # doctest: +SKIP
>>> 'female' in ps.dct_files['base'].columns # doctest: +SKIP
True
"""
df = self.dct_files["base"]
df = df.map_partitions(
lambda pdf: pdf.assign(
female=np.select(
[
pdf["SEX_CD"].str.strip().str.upper() == "F",
pdf["SEX_CD"].str.strip().str.upper() == "M",
],
[1, 0],
default=-1,
).astype(int)
)
)
self.dct_files["base"] = df
self.cache_results("base")
[docs]
def flag_rural(
self, method: str = "ruca"
) -> None:
"""
Classifies benes into rural/ non-rural on the basis of RUCA/ RUCC of
their resident ZIP/ FIPS codes
New Columns:
- resident_state_cd
- rural - 0/ 1/ np.nan, 1 when bene's residence is in a rural
location, 0 when not, -1 when zip code is missing
- pcsa - resident PCSA code
- census_region - resident census region
- census_division - resider census division
- {ruca_code/ rucc_code} - resident ruca_code
This function uses
- `RUCA 3.1 dataset
<https://www.ers.usda.gov/data-products/rural-urban-commuting-area-codes/>`_.
RUCA codes >= 4 denote rural
and the rest denote urban as per `Cole, Megan B et al
<https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6286055/#SD1>`_
- `RUCC codes
<https://www.ers.usda.gov/data-products/rural-urban-continuum-codes/>`_.
RUCC codes >= 8 denote
rural and the rest denote urban.
- ZCTAs x zipcode crosswalk from `UDSMapper
<https://udsmapper.org/zip-code-to-zcta-crosswalk/>`_.
- zipcodes from multiple sources
- Distance between centroids of zipcodes using
`NBER data <https://nber.org/distance/2016/gaz/zcta5
/gaz2016zcta5centroid.csv>`_
Parameters
----------
method : {'ruca', 'rucc'}
Method to use for rural variable construction
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_rural(method='ruca') # doctest: +SKIP
"""
df = self.dct_files["base"]
index_col = df.index.name
zip_folder = os.path.join(other_data_folder, "zip")
df = df.assign(**{index_col: df.index})
# Pad zeroes to the left to make zip codes 9 characters long.
# RI Zip codes have problems. They are all invalid unless the last
# character is dropped and a zero is added to the left
df = df.assign(
BENE_ZIP_CD=df["BENE_ZIP_CD"]
.where(
~((df["STATE_CD"] == "RI")),
"0" + df["BENE_ZIP_CD"].str.ljust(9, "0").str[:-1],
)
.str.ljust(9, "0")
)
# zip_state_pcsa_ruca_zcta.csv was constructed with RUCA 3.1
# (from https://www.ers.usda.gov/data-products/rural-urban-commuting-area-codes/),
# ZCTAs x zipcode mappings from UDSMapper (https://udsmapper.org/zip-code-to-zcta-crosswalk/),
# zipcodes from multiple sources, and distance between centroids of
# zipcodes using NBER data
# (https://nber.org/distance/2016/gaz/zcta5/gaz2016zcta5centroid.csv)
df_zip_state_pcsa = pd.read_csv(
os.path.join(zip_folder, "zip_state_pcsa_ruca_zcta.csv"),
dtype=object,
)
df_census_divisions = pd.read_csv(
os.path.join(zip_folder, "census_divisions.csv"), dtype=object
)
df_zip_state_pcsa = df_zip_state_pcsa.assign(
zip=df_zip_state_pcsa["zip"].str.replace(" ", "").str.ljust(9, "0")
)
df_zip_state_pcsa = df_zip_state_pcsa.merge(
df_census_divisions, on="state_cd", how="left"
)
df_zip_state_pcsa = df_zip_state_pcsa.rename(
columns={
"zip": "BENE_ZIP_CD",
"state_cd": "resident_state_cd",
}
)
df = df[
df.columns.difference(
[
"resident_state_cd",
"pcsa",
"ruca_code",
"census_region",
"census_division",
]
).tolist()
]
df = df.assign(
BENE_ZIP_CD=df["BENE_ZIP_CD"]
.str.replace("[^a-zA-Z0-9]+", "", regex=True)
.str.ljust(9, "0")
)
df = df.merge(
df_zip_state_pcsa[
[
"BENE_ZIP_CD",
"resident_state_cd",
"pcsa",
"ruca_code",
"census_region",
"census_division",
]
],
how="left",
on="BENE_ZIP_CD",
)
# RUCC codes were downloaded from
# https://www.ers.usda.gov/data-products/rural-urban-continuum-codes/
df_rucc = pd.read_excel(
os.path.join(zip_folder, "ruralurbancodes2013.xls"),
sheet_name="Rural-urban Continuum Code 2013",
dtype="object",
)
df_rucc = df_rucc.rename(
columns={
"State": "resident_state_cd",
"RUCC_2013": "rucc_code",
"FIPS": "BENE_CNTY_CD",
}
)
df_rucc = df_rucc.assign(
BENE_CNTY_CD=df_rucc["BENE_CNTY_CD"].str.strip().str[2:],
resident_state_cd=df_rucc["resident_state_cd"]
.str.strip()
.str.upper(),
)
df = df.assign(
BENE_CNTY_CD=df["BENE_CNTY_CD"].str.strip(),
resident_state_cd=df["resident_state_cd"].where(
~df["resident_state_cd"].isna(), df["STATE_CD"]
),
)
df = df[[col for col in df.columns if col not in ["rucc_code"]]]
df = df.merge(
df_rucc[["BENE_CNTY_CD", "resident_state_cd", "rucc_code"]],
how="left",
on=["BENE_CNTY_CD", "resident_state_cd"],
)
df = df.assign(
**{
col: dd.to_numeric(df[col], errors="coerce")
for col in ["rucc_code", "ruca_code"]
}
)
# RUCA codes >= 4 denote rural and the rest denote urban
# as per https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6286055/#SD1
# and as in FARA year 1 papers
if method == "ruca":
df = df.map_partitions(
lambda pdf: pdf.assign(
rural=np.select(
[
pdf["ruca_code"].between(
0, 4, inclusive="neither"
),
(pdf["ruca_code"] >= 4),
],
[0, 1],
default=-1,
).astype(int)
)
)
else:
# RUCC codes >= 8 denote rural and the rest denote urban
df = df.map_partitions(
lambda pdf: pdf.assign(
rural=np.select(
[
pdf["rucc_code"].between(1, 7, inclusive="both"),
(pdf["rucc_code"] >= 8),
],
[0, 1],
default=-1,
).astype(int)
)
)
if df.index.name != index_col:
df = df.set_index(index_col, sorted=True)
self.dct_files["base"] = df
self.cache_results("base")
[docs]
def flag_dual(self) -> None:
"""
Flags benes with DUAL_ELGBL_CD equal to 1 (full dual), 2 (partial
dual), or 3 (other dual) in any month are flagged as duals.
References
----------
- `Identifying beneficiaries with a substance use disorder
<https://www.medicaid.gov/medicaid/data-and-systems/downloads
/macbis/sud_techspecs.docx>`_
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_dual() # doctest: +SKIP
"""
df = self.dct_files["base"]
df = df.assign(
**{
f"dual_mon_{mon}": dd.to_numeric(
df[f"DUAL_ELGBL_CD_{str(mon).zfill(2)}"], errors="coerce"
)
.isin([1, 2, 3])
.astype(int)
for mon in range(1, 13)
}
)
df = df.map_partitions(
lambda pdf: pdf.assign(
any_dual_month=pdf[[f"dual_mon_{mon}" for mon in range(1, 13)]]
.any(axis=1)
.astype(int),
dual_months=pdf["dual_mon_1"]
.astype(str)
.str.cat(
pdf[[f"dual_mon_{mon}" for mon in range(2, 13)]].astype(
str
),
sep="",
),
total_dual_months=pdf[
[f"dual_mon_{mon}" for mon in range(1, 13)]
]
.sum(axis=1)
.astype(int),
)
)
df = df.drop(columns=[f"dual_mon_{mon}" for mon in range(1, 13)])
self.dct_files["base"] = df
self.cache_results("base")
[docs]
def flag_restricted_benefits(self) -> None:
"""
Flags beneficiaries whose benefits are restricted. Benes with the
below values in their RSTRCTD_BNFTS_CD_XX columns are NOT assumed to
have restricted benefits:
- 1. Individual is eligible for Medicaid or CHIP and entitled to the
full scope of Medicaid or CHIP benefits.
- 4. Individual is eligible for Medicaid or CHIP but only entitled
to restricted benefits for pregnancy-related services.
- 5. Individual is eligible for Medicaid or Medicaid-Expansion CHIP
but, for reasons other than alien, dual-eligibility or
pregnancy-related status, is only entitled to restricted
benefits (e.g., restricted benefits based upon substance abuse,
medically needy or other criteria).
- 7. Individual is eligible for Medicaid and entitled to Medicaid
benefits under an alternative package of benchmark-equivalent
coverage, as enacted by the Deficit Reduction Act of 2005.
Reference: `Identifying beneficiaries with a substance use disorder
<https://www.medicaid.gov/medicaid/data-and-systems/downloads/macbis
/sud_techspecs.docx>`_
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_restricted_benefits() # doctest: +SKIP
"""
df = self.dct_files["base"]
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
f"restricted_benefit_mon_{mon}": (
~pd.to_numeric(
pdf[f"RSTRCTD_BNFTS_CD_{str(mon).zfill(2)}"],
errors="coerce",
).isin([1, 4, 5, 7])
).astype(int)
for mon in range(1, 13)
}
)
)
df = df.map_partitions(
lambda pdf: pdf.assign(
any_restricted_benefit_month=pdf[
[f"restricted_benefit_mon_{mon}" for mon in range(1, 13)]
]
.any(axis=1)
.astype(int),
restricted_benefit_months=pdf["restricted_benefit_mon_1"]
.astype(str)
.str.cat(
pdf[
[
f"restricted_benefit_mon_{mon}"
for mon in range(2, 13)
]
].astype(str),
sep="",
),
total_restricted_benefit_months=pdf[
[f"restricted_benefit_mon_{mon}" for mon in range(1, 13)]
]
.sum(axis=1)
.astype(int),
)
)
df = df.drop(
columns=[f"restricted_benefit_mon_{mon}" for mon in range(1, 13)]
)
self.dct_files["base"] = df
self.cache_results("base")
[docs]
def compute_enrollment_gaps(self) -> None:
"""Computes enrollment gaps using dates file. Adds number of
enrollment gaps and length of maximum enrollment gap in days columns.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.compute_enrollment_gaps() # doctest: +SKIP
"""
df = self.dct_files["dates"]
df = dataframe_utils.fix_index(
df, index_name=self.index_col, drop_column=False
)
def fill_enrollment_gaps(pdf_dates: pd.DataFrame) -> pd.DataFrame:
"""
Adds enrollment gap column
Parameters
----------
pdf_dates : pd.DataFrame
Dates dataframe
Returns
-------
pd.DataFrame
Dates dataframe with enrollment gap columns added.
Examples
--------
>>> # Called internally by compute_enrollment_gaps # doctest: +SKIP
"""
pdf_dates = pdf_dates.reset_index(drop=True)
pdf_dates = pdf_dates.sort_values(
[self.index_col, "enrollment_start_date"], ascending=True
)
pdf_dates = pdf_dates.assign(
enrollment_end_date=pdf_dates.groupby(
[self.index_col, "enrollment_start_date"]
)["enrollment_end_date"].transform("max")
)
pdf_dates = pdf_dates.drop_duplicates(
[
self.index_col,
"enrollment_start_date",
"enrollment_end_date",
]
)
pdf_dates = pdf_dates.assign(
enrollment_end_date=pdf_dates["enrollment_end_date"].fillna(
pd.to_datetime(
pdf_dates["enrollment_start_date"]
.dt.year.fillna(self.year)
.astype(str)
+ "-12-31"
)
)
)
pdf_dates = pdf_dates.assign(
next_enrollment_start_date=pdf_dates.groupby(self.index_col)[
"enrollment_start_date"
]
.shift(-1)
.fillna(
pd.to_datetime(
pdf_dates["enrollment_end_date"]
.dt.year.fillna(self.year)
.astype(str)
+ "-12-31"
)
)
)
pdf_dates = pdf_dates.assign(
enrollment_gap=(
pdf_dates["next_enrollment_start_date"]
- pdf_dates["enrollment_end_date"]
).dt.days
)
pdf_enrollment_beginnings = pdf_dates.groupby(
self.index_col
).first()
pdf_enrollment_beginnings = pdf_enrollment_beginnings.loc[
pdf_enrollment_beginnings["enrollment_start_date"]
> pd.to_datetime(f"{self.year}-01-01")
]
pdf_enrollment_beginnings = pdf_enrollment_beginnings.assign(
enrollment_gap=(
pdf_enrollment_beginnings["enrollment_start_date"]
- pd.to_datetime(f"{self.year}-01-01")
).dt.days
)
pdf_enrollment_beginnings = pdf_enrollment_beginnings.assign(
enrollment_end_date=pdf_enrollment_beginnings[
"enrollment_start_date"
]
)
pdf_dates = pd.concat(
[pdf_enrollment_beginnings.reset_index(drop=False), pdf_dates],
ignore_index=True,
)
pdf_dates = pdf_dates.set_index(self.index_col)
return pdf_dates
df = df.map_partitions(fill_enrollment_gaps)
self.dct_files["dates"] = df
self.cache_results("dates")
df_gaps = df.loc[df["enrollment_gap"] != 0]
df_gaps = df_gaps.map_partitions(
lambda pdf: pdf.assign(enrollment_gap=pdf["enrollment_gap"].abs())
.groupby([self.index_col])
.agg(
**{
"n_enrollment_gaps": ("enrollment_gap", "size"),
"max_enrollment_gap": ("enrollment_gap", "max"),
"total_gap_in_enrollment": ("enrollment_gap", "sum"),
}
)
).compute()
self.dct_files["base"] = self.dct_files["base"].merge(
df_gaps, left_index=True, right_index=True, how="left"
)
self.dct_files["base"] = self.dct_files["base"].assign(
**{
col: self.dct_files["base"][col].fillna(0).astype(int)
for col in [
"n_enrollment_gaps",
"max_enrollment_gap",
"total_gap_in_enrollment",
]
}
)
self.cache_results("base")
[docs]
def flag_medicaid_enrolled_months(self) -> None:
"""
Creates flags for medicaid enrollment for each month and computes the
total number of months enrolled in medicaid. Bene has to be enrolled
for all days of the month without missing eligibility information
for the month to be considered a medicaid enrolled month.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_medicaid_enrolled_months() # doctest: +SKIP
"""
df_base = self.dct_files["base"]
df_base = df_base.assign(
**{
f"enrollment_mon_{mon}": (
(
(
dd.to_numeric(
df_base[
f"MDCD_ENRLMT_DAYS_{str(mon).zfill(2)}"
],
errors="coerce",
)
>= (
28
if (mon == 2)
else (
31
if (mon in [1, 3, 5, 7, 8, 10, 12])
else 30
)
)
)
| (
dd.to_numeric(
df_base[
f"CHIP_ENRLMT_DAYS_{str(mon).zfill(2)}"
],
errors="coerce",
)
>= (
28
if (mon == 2)
else (
31
if (mon in [1, 3, 5, 7, 8, 10, 12])
else 30
)
)
)
)
& (
dd.to_numeric(
df_base[
f"MISG_ENRLMT_TYPE_IND_{str(mon).zfill(2)}"
],
errors="coerce",
)
== 0
)
)
.fillna(False)
.astype(int)
for mon in range(1, 13)
}
)
df_base = df_base.map_partitions(
lambda pdf: pdf.assign(
total_enrolled_months=pdf[
[f"enrollment_mon_{mon}" for mon in range(1, 13)]
]
.sum(axis=1)
.astype(int),
enrolled_months=pdf["enrollment_mon_1"]
.astype(str)
.str.cat(
pdf[
[f"enrollment_mon_{mon}" for mon in range(2, 13)]
].astype(str),
sep="",
),
),
meta=df_base._meta.assign(total_enrolled_months=0, enrolled_months=""),
)
df_base = df_base.map_partitions(
lambda pdf: pdf.assign(
max_continuous_enrolment=pdf["enrolled_months"].apply(
lambda x: max(len(s) for s in str(x).split("0"))
)
),
meta=df_base._meta.assign(max_continuous_enrolment=0),
)
df_base = df_base.drop(
columns=[f"enrollment_mon_{mon}" for mon in range(1, 13)]
)
self.dct_files["base"] = df_base
self.cache_results("base")
[docs]
def flag_managed_care_months(self) -> None:
"""
Creates flags for 3 categories of managed care plans for each month,
and adds columns denoting total number of months enrolled in these
plans and the enrollment sequence pattern.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_managed_care_months() # doctest: +SKIP
"""
if "managed_care" in self.dct_files:
df_mc = self.dct_files["managed_care"]
df_mc = df_mc.assign(
**{
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}": dd.to_numeric(
df_mc[
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}"
],
errors="coerce",
)
for seq, mon in product(range(1, 17), range(1, 13))
}
)
df_mc = df_mc.assign(
**{
**{
f"mc_comp_mon_{mon}": df_mc[
[
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}"
for seq in range(1, 17)
]
]
.isin([1, 4, 80])
.any(axis=1)
.astype(int)
for mon in range(1, 13)
},
**{
f"mc_behav_health_mon_{mon}": df_mc[
[
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}"
for seq in range(1, 17)
]
]
.isin([8, 9, 10, 11, 12, 13])
.any(axis=1)
.astype(int)
for mon in range(1, 13)
},
**{
f"mc_pccm_mon_{mon}": df_mc[
[
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}"
for seq in range(1, 17)
]
]
.isin([2, 3, 70])
.any(axis=1)
.astype(int)
for mon in range(1, 13)
},
**{
f"mc_comp_or_pccm_mon_{mon}": df_mc[
[
"MC_PLAN_TYPE_CD_"
f"{str(seq).zfill(2)}_"
f"{str(mon).zfill(2)}"
for seq in range(1, 17)
]
]
.isin([1, 4, 80, 2, 3, 70])
.any(axis=1)
.astype(int)
for mon in range(1, 13)
},
}
)
_mc_meta = df_mc._meta.assign(
**{f"mc_{mc_type}_months": "" for mc_type in [
"comp", "behav_health", "pccm", "comp_or_pccm"]},
**{f"total_mc_{mc_type}_months": 0 for mc_type in [
"comp", "behav_health", "pccm", "comp_or_pccm"]},
)
df_mc = df_mc.map_partitions(
lambda pdf: pdf.assign(
**{
**{
f"mc_{mc_type}_months": pdf[f"mc_{mc_type}_mon_1"]
.astype(str)
.str.cat(
pdf[
[
f"mc_{mc_type}_mon_{mon}"
for mon in range(2, 13)
]
].astype(str),
sep="",
)
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
},
**{
f"total_mc_{mc_type}_months": pdf[
[
f"mc_{mc_type}_mon_" f"{mon}"
for mon in range(1, 13)
]
].sum(axis=1)
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
},
}
),
meta=_mc_meta,
)
df_mc = df_mc.map_partitions(
lambda pdf: pdf.assign(
**{
f"max_continuous_mc_{mc_type}_enrollment": pdf[
f"mc_{mc_type}_months"
].apply(lambda x: max(len(s) for s in str(x).split("0")))
for mc_type in ["comp", "comp_or_pccm"]
}
),
meta=df_mc._meta.assign(
**{f"max_continuous_mc_{mc_type}_enrollment": 0
for mc_type in ["comp", "comp_or_pccm"]}
),
)
df_mc = df_mc.drop(
columns=[
f"mc_{mc_type}_mon_{mon}"
for mc_type, mon in product(
["comp", "behav_health", "pccm", "comp_or_pccm"],
range(1, 13),
)
]
)
self.dct_files["managed_care"] = df_mc
self.cache_results("managed_care")
df_base = self.dct_files["base"]
df_base = df_base.merge(
df_mc[
[
f"mc_{mc_type}_months"
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
]
+ [
f"total_mc_{mc_type}_months"
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
]
+ [
f"max_continuous_mc_{mc_type}_enrollment"
for mc_type in ["comp", "comp_or_pccm"]
]
].compute(),
left_index=True,
right_index=True,
how="left",
)
df_base = df_base.assign(
**{
**{
col: df_base[col].fillna(0).astype(int)
for col in [
f"total_mc_{mc_type}_months"
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
]
+ [
f"max_continuous_mc_{mc_type}_enrollment"
for mc_type in ["comp", "comp_or_pccm"]
]
},
**{
col: df_base[col].fillna("0".zfill(12))
for col in [
f"mc_{mc_type}_months"
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
]
},
}
)
self.dct_files["base"] = df_base
self.cache_results("base")
else:
self.dct_files["base"] = self.dct_files["base"].assign(
**{
**{
f"mc_{mc_type}_months": "0".zfill(12)
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
},
**{
f"total_mc_{mc_type}_months": 0
for mc_type in [
"comp",
"behav_health",
"pccm",
"comp_or_pccm",
]
},
**{
f"max_continuous_mc_{mc_type}_enrollment": 0
for mc_type in ["comp", "comp_or_pccm"]
},
}
)
self.cache_results("base")
[docs]
def flag_tanf(self) -> None:
"""
The Temporary Assistance for Needy Families (TANF) program provides
temporary financial assistance for pregnant women and families with
one or more dependent children. This provides financial assistance to
help pay for food, shelter, utilities, and expenses other than
medical. In TAF files this is identified via
`TANF_CASH_CD:`
- 1: INDIVIDUAL DID NOT RECEIVE TANF BENEFITS DURING THE YEAR;
- 2: INDIVIDUAL DID RECEIVE TANF BENEFITS DURING THE YEAR
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_tanf() # doctest: +SKIP
"""
df_base = self.dct_files["base"]
df_base = df_base.assign(
tanf=(
dd.to_numeric(df_base["TANF_CASH_CD"], errors="coerce") == 2
).astype(int)
)
self.dct_files["base"] = df_base
self.cache_results("base")
[docs]
def gather_bene_level_diag_ndc_codes(self) -> None:
"""Constructs patient level NDC and diagnosis code list columns and
saves them to individual file.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', tmp_folder='/tmp/ps') # doctest: +SKIP
>>> ps.gather_bene_level_diag_ndc_codes() # doctest: +SKIP
"""
lst_util_claim_types = ["ip", "ot", "rx"]
dct_utilization_claims = {
claim_type: taf_file.TAFFile.get_claim_instance(
claim_type,
self.year,
self.state,
self.data_root,
clean=False,
preprocess=False,
pq_engine=self.pq_engine,
tmp_folder=os.path.join(self.tmp_folder, claim_type),
)
for claim_type in lst_util_claim_types
}
for claim_type in lst_util_claim_types:
claim_file = dct_utilization_claims[claim_type]
claim_file.clean_codes()
claim_file.gather_bene_level_diag_ndc_codes()
df_diag = dd.concat(
[
dct_utilization_claims[claim_type].dct_files["base_diag_codes"]
for claim_type in ["ip", "ot"]
],
axis=0,
ignore_index=False,
interleave_partitions=True,
)
df_ndc = dd.concat(
[
dct_utilization_claims[claim_type].dct_files["line_ndc_codes"]
for claim_type in ["ip", "ot", "rx"]
if pd.notna(
dct_utilization_claims[claim_type]
.dct_files["line_ndc_codes"]
.divisions[0]
)
],
axis=0,
interleave_partitions=True,
ignore_index=False,
)
df_diag = dataframe_utils.fix_index(df_diag, self.index_col, True)
df_diag = df_diag.map_partitions(
lambda pdf: pdf.groupby(pdf.index).agg(
{
"LST_DIAG_CD": lambda x: ",".join(
set((",".join([y for y in x if bool(y)])).split(","))
),
"LST_DIAG_CD_RAW": lambda x: ",".join(
[y for y in x if bool(y)]
),
}
)
)
df_ndc = dataframe_utils.fix_index(df_ndc, self.index_col, True)
df_ndc = df_ndc.map_partitions(
lambda pdf: pdf.groupby(pdf.index).agg(
{
"LST_NDC": lambda x: ",".join(
set((",".join([y for y in x if bool(y)])).split(","))
),
"LST_NDC_RAW": lambda x: ",".join(
[y for y in x if bool(y)]
),
}
)
)
dataframe_utils.fix_index(df_diag, index_name=self.index_col)
dataframe_utils.fix_index(df_ndc, index_name=self.index_col)
df_diag = df_diag.assign(
**{col: df_ndc[col] for col in ["LST_NDC", "LST_NDC_RAW"]}
)
self.add_custom_subtype("diag_and_ndc_codes", df_diag)
self.cache_results("diag_and_ndc_codes")
[docs]
def add_risk_adjustment_scores(self) -> None:
"""Adds bene level risk adjustment scores. Currently supports
Elixhauser scores.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', tmp_folder='/tmp/ps') # doctest: +SKIP
>>> ps.add_risk_adjustment_scores() # doctest: +SKIP
"""
if "diag_and_ndc_codes" not in self.dct_files:
self.gather_bene_level_diag_ndc_codes()
df_diag_ndc = self.dct_files["diag_and_ndc_codes"]
df_diag_ndc = elixhauser_comorbidity.score(
df_diag_ndc, "LST_DIAG_CD", cms_format="TAF"
)
dataframe_utils.fix_index(df_diag_ndc, "BENE_MSIS")
self.dct_files["diag_and_ndc_codes"] = df_diag_ndc
self.cache_results("diag_and_ndc_codes")
df_base = self.dct_files["base"]
df_base = df_base.assign(
**{
col: self.dct_files["diag_and_ndc_codes"][col]
for col in ["elixhauser_score"]
+ ["ELX_GRP_" + str(i) for i in range(1, 32)]
}
)
self.dct_files["base"] = df_base
self.cache_results("base")
[docs]
def flag_ffs_months(self) -> None:
"""
Creates flags for months enrolled in medicaid without
enrollment in managed care plans of 3 categories, and adds columns
denoting total number of months enrolled in these
plans and the enrollment sequence pattern.
Examples
--------
>>> from medicaid_utils.preprocessing.taf_ps import TAFPS # doctest: +SKIP
>>> ps = TAFPS(2019, 'AL', '/data/cms', clean=False, preprocess=False) # doctest: +SKIP
>>> ps.flag_ffs_months() # doctest: +SKIP
"""
df_base = self.dct_files["base"]
_ffs_meta = df_base._meta.assign(
ffs_months="",
ffs_no_mc_behav_health_months="",
ffs_no_mc_pccm_months="",
ffs_no_mc_comp_or_pccm_months="",
)
df_base = df_base.map_partitions(
lambda pdf: pdf.assign(
**{
"ffs_months": pdf.apply(
lambda x: "".join(
[
str(int((enrl == "1") and (mc == "0")))
for mc, enrl in zip(
x["mc_comp_months"]
if pd.notna(x["mc_comp_months"])
else "0".zfill(12),
x["enrolled_months"]
if pd.notna(x["enrolled_months"])
else "0".zfill(12),
)
]
),
axis=1,
),
"ffs_no_mc_behav_health_months": pdf.apply(
lambda x: "".join(
[
str(int((enrl == "1") and (mc == "0")))
for mc, enrl in zip(
x["mc_behav_health_months"]
if pd.notna(x["mc_behav_health_months"])
else "0".zfill(12),
x["enrolled_months"]
if pd.notna(x["enrolled_months"])
else "0".zfill(12),
)
]
),
axis=1,
),
"ffs_no_mc_pccm_months": pdf.apply(
lambda x: "".join(
[
str(int((enrl == "1") and (mc == "0")))
for mc, enrl in zip(
x["mc_pccm_months"]
if pd.notna(x["mc_pccm_months"])
else "0".zfill(12),
x["enrolled_months"]
if pd.notna(x["enrolled_months"])
else "0".zfill(12),
)
]
),
axis=1,
),
"ffs_no_mc_comp_or_pccm_months": pdf.apply(
lambda x: "".join(
[
str(int((enrl == "1") and (mc == "0")))
for mc, enrl in zip(
x["mc_comp_or_pccm_months"]
if pd.notna(x["mc_comp_or_pccm_months"])
else "0".zfill(12),
x["enrolled_months"]
if pd.notna(x["enrolled_months"])
else "0".zfill(12),
)
]
),
axis=1,
),
}
),
meta=_ffs_meta,
)
df_base = df_base.assign(
**{
f"total_ffs_{ffs_type}months": df_base[f"ffs_{ffs_type}months"]
.str.replace("0", "")
.str.len()
for ffs_type in [
"",
"no_mc_behav_health_",
"no_mc_pccm_",
"no_mc_comp_or_pccm_",
]
}
)
df_base = df_base.map_partitions(
lambda pdf: pdf.assign(
max_continuous_ffs_enrollment=pdf["ffs_months"].apply(
lambda x: max(len(s) for s in str(x).split("0"))
),
max_continuous_ffs_no_mc_comp_or_pccm_enrollment=pdf[
"ffs_no_mc_comp_or_pccm_months"
].apply(lambda x: max(len(s) for s in str(x).split("0"))),
),
meta=df_base._meta.assign(
max_continuous_ffs_enrollment=0,
max_continuous_ffs_no_mc_comp_or_pccm_enrollment=0,
),
)
self.dct_files["base"] = df_base
self.cache_results("base")