"""This module has MAXPS class which wraps together cleaning/ preprocessing
routines specific for MAX PS files"""
import os
from typing import Optional
import numpy as np
import pandas as pd
import dask.dataframe as dd
from medicaid_utils.preprocessing import max_file
data_folder = os.path.join(os.path.dirname(__file__), "data")
other_data_folder = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "other_datasets", "data"
)
[docs]
class MAXPS(max_file.MAXFile):
"""Scripts to preprocess PS file"""
def __init__(
self,
year: int,
state: str,
data_root: str,
index_col: str = "BENE_MSIS",
clean: bool = True,
preprocess: bool = True,
rural_method: str = "ruca",
tmp_folder: Optional[str] = None,
pq_engine: str = "pyarrow",
) -> None:
"""
Initializes PS file object by preloading and preprocessing(if opted
in) the file
Parameters
----------
year : int
Year of claim file
state : str
State of claim file
data_root : str
Root folder of raw claim files
index_col : str, default='BENE_MSIS'
Index column name. Eg. BENE_MSIS or MSIS_ID. The raw file is
expected to be already
sorted with index column
clean : bool, default=True
Should the associated files be cleaned?
preprocess : bool, default=True
Should the associated files be preprocessed?
rural_method : {'ruca', 'rucc'}
Method to use for rural variable construction
tmp_folder : str, default=None
Folder location to use for caching intermediate results. Can be
turned off by not passing this argument.
pq_engine : str, default='pyarrow'
Parquet engine to use
"""
super().__init__(
"ps",
year=year,
state=state,
data_root=data_root,
index_col=index_col,
clean=False,
preprocess=False,
tmp_folder=tmp_folder,
pq_engine=pq_engine,
)
# Default filters to filter out benes that do not meet minimum
# standard of cleanliness criteria duplicated_bene_id exclusion will
# remove benes with duplicated BENE_MSIS ids
self.dct_default_filters = {"duplicated_bene_id": 0}
if clean:
self.clean()
if preprocess:
self.preprocess(rural_method)
[docs]
def clean(self) -> None:
"""Runs cleaning routines and adds common exclusion flags based on
default filters"""
super().clean()
self.flag_common_exclusions()
self.cache_results()
[docs]
def preprocess(
self, rural_method: str = "ruca"
) -> None:
"""
Adds rural, eligibility criteria, dual, and restricted benefits
indicator variables
Parameters
----------
rural_method : {'ruca', 'rucc'}
Method to use for rural variable construction
"""
self.flag_rural(rural_method)
self.add_eligibility_status_columns()
self.flag_duals()
self.flag_restricted_benefits()
self.flag_tanf()
self.cache_results()
[docs]
def flag_common_exclusions(self) -> None:
"""
Adds exclusion flags
New Column(s):
- excl_duplicated_bene_id - 0 or 1, 1 when bene's index column
is repeated
"""
self.df = self.df.assign(
**dict([(f"_{self.index_col}", self.df.index)])
)
# Some BENE_MSIS's are repeated in PS files. Some patients share the
# same BENE_ID and yet have different MSIS_IDs. Some of them even
# have different 'dates of birth'. Since we cannot find any
# explanation for such patterns, we decided on removing these
# BENE_MSIS's as per issue #29 in FARA project
# (https://rcg.bsd.uchicago.edu/gitlab/mmurugesan/hrsa_max_feature_extraction/issues/29)
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
excl_duplicated_bene_id=pdf.duplicated(
[f"_{self.index_col}"], keep=False
).astype(int)
)
)
self.df = self.df.drop([f"_{self.index_col}"], axis=1)
[docs]
def flag_duals(self) -> None:
"""
Flags dual patients
New column(s):
dual - 0 or 1 column, 0 if 0 <= EL_MDCR_DUAL_ANN <= 9 for years
2007, 2009, 2011 0 <= EL_MDCR_DUAL_ANN <= 9 for other years
"""
self.df = self.df.assign(
dual=(
~dd.to_numeric(
self.df[
"EL_MDCR_DUAL_ANN"
if (self.year in [2007, 2009, 2011])
else "EL_MDCR_ANN_XOVR_99"
],
errors="coerce",
).between(0, 9, inclusive="both")
).astype(int)
)
[docs]
def flag_rural(
self, method: str = "ruca"
) -> None:
"""
Classifies benes into rural/ non-rural on the basis of RUCA/ RUCC of
their resident ZIP/ FIPS codes
New Columns:
- resident_state_cd
- rural - 0/ 1/ -1, 1 when bene's residence is in a rural
location, 0 when not. -1 when zip code is missing
- pcsa - resident PCSA code
- {ruca_code/ rucc_code} - resident ruca_code
This function uses
- RUCA 3.1 dataset (from
https://www.ers.usda.gov/data-products/rural-urban-commuting-area-codes/).
RUCA codes >= 4 denote rural and the rest denote urban as per
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6286055/#SD1
- RUCC codes were downloaded from
https://www.ers.usda.gov/data-products/rural-urban-continuum-codes/.
RUCC codes >= 8 denote rural and the rest denote urban.
- ZCTAs x zipcode crosswalk from UDSMapper
(https://udsmapper.org/zip-code-to-zcta-crosswalk/),
- zipcodes from multiple sources
- distance between centroids of zipcodes using NBER data
(https://nber.org/distance/2016/gaz/zcta5/gaz2016zcta5centroid.csv)
Parameters
----------
method : {'ruca', 'rucc'}
Method to use for rural variable construction
"""
index_col = self.df.index.name
zip_folder = os.path.join(other_data_folder, "zip")
self.df = self.df.assign(**dict([(index_col, self.df.index)]))
# 2012 RI claims report zip codes have problems. They are all
# invalid unless the last character is dropped. So
# dropping it as per email exchange with Alex Knitter & Dr.
# Laiteerapong (May 2020)
self.df = self.df.assign(
EL_RSDNC_ZIP_CD_LTST=self.df["EL_RSDNC_ZIP_CD_LTST"].where(
~((self.df["STATE_CD"] == "RI") & (self.df["year"] == 2012)),
self.df["EL_RSDNC_ZIP_CD_LTST"].str[:-1],
)
)
# zip_state_pcsa_ruca_zcta.csv was constructed with RUCA 3.1
# (from https://www.ers.usda.gov/data-products/rural-urban-commuting-area-codes/),
# ZCTAs x zipcode mappings from UDSMapper (https://udsmapper.org/zip-code-to-zcta-crosswalk/),
# zipcodes from multiple sources, and distance between centroids of zipcodes using NBER data
# (https://nber.org/distance/2016/gaz/zcta5/gaz2016zcta5centroid.csv)
df_zip_state_pcsa = pd.read_csv(
os.path.join(zip_folder, "zip_state_pcsa_ruca_zcta.csv"),
dtype=object,
)
df_zip_state_pcsa = df_zip_state_pcsa.assign(
zip=df_zip_state_pcsa["zip"].str.replace(" ", "").str.zfill(9)
)
df_zip_state_pcsa = df_zip_state_pcsa.rename(
columns={
"zip": "EL_RSDNC_ZIP_CD_LTST",
"state_cd": "resident_state_cd",
}
)
self.df = self.df[
self.df.columns.difference(
["resident_state_cd", "pcsa", "ruca_code"]
).tolist()
]
self.df = self.df.assign(
EL_RSDNC_ZIP_CD_LTST=self.df["EL_RSDNC_ZIP_CD_LTST"]
.str.replace(" ", "")
.str.zfill(9)
)
self.df = self.df.merge(
df_zip_state_pcsa[
[
"EL_RSDNC_ZIP_CD_LTST",
"resident_state_cd",
"pcsa",
"ruca_code",
]
],
how="left",
on="EL_RSDNC_ZIP_CD_LTST",
)
# RUCC codes were downloaded from
# https://www.ers.usda.gov/data-products/rural-urban-continuum-codes/
df_rucc = pd.read_excel(
os.path.join(zip_folder, "ruralurbancodes2013.xls"),
sheet_name="Rural-urban Continuum Code 2013",
dtype="object",
)
df_rucc = df_rucc.rename(
columns={
"State": "resident_state_cd",
"RUCC_2013": "rucc_code",
"FIPS": "EL_RSDNC_CNTY_CD_LTST",
}
)
df_rucc = df_rucc.assign(
EL_RSDNC_CNTY_CD_LTST=df_rucc["EL_RSDNC_CNTY_CD_LTST"]
.str.strip()
.str[2:],
resident_state_cd=df_rucc["resident_state_cd"]
.str.strip()
.str.upper(),
)
self.df = self.df.assign(
EL_RSDNC_CNTY_CD_LTST=self.df["EL_RSDNC_CNTY_CD_LTST"].str.strip(),
resident_state_cd=self.df["resident_state_cd"].where(
~self.df["resident_state_cd"].isna(), self.df["STATE_CD"]
),
)
self.df = self.df[
[col for col in self.df.columns if col not in ["rucc_code"]]
]
self.df = self.df.merge(
df_rucc[
["EL_RSDNC_CNTY_CD_LTST", "resident_state_cd", "rucc_code"]
],
how="left",
on=["EL_RSDNC_CNTY_CD_LTST", "resident_state_cd"],
)
self.df = self.df.assign(
**{
col: dd.to_numeric(self.df[col], errors="coerce")
for col in ["rucc_code", "ruca_code"]
}
)
# RUCA codes >= 4 denote rural and the rest denote urban
# as per https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6286055/#SD1
# and as in FARA year 1 papers
if method == "ruca":
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
rural=np.select(
[
pdf["ruca_code"].between(
0, 4, inclusive="neither"
),
(pdf["ruca_code"] >= 4),
],
[0, 1],
default=-1,
).astype(int)
)
)
else:
# RUCC codes >= 8 denote rural and the rest denote urban
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
rural=np.select(
[
pdf["rucc_code"].between(1, 7, inclusive="both"),
(pdf["rucc_code"] >= 8),
],
[0, 1],
default=-1,
).astype(int)
)
)
if self.df.index.name != index_col:
self.df = self.df.set_index(index_col, sorted=True)
[docs]
def add_eligibility_status_columns(self) -> None:
"""
Add eligibility columns based on MAX_ELG_CD_MO_{month} values for
each month.
MAX_ELG_CD_MO:00 = NOT ELIGIBLE, 99 = UNKNOWN ELIGIBILITY => codes
to denote ineligibility
New Column(s):
- elg_mon_{month} - 0 or 1 value column, denoting eligibility
for each month
- total_elg_mon - No. of eligible months
- elg_full_year - 0 or 1 value column, 1 if total_elg_mon = 12
- elg_over_9mon - 0 or 1 value column, 1 if total_elg_mon >= 9
- elg_over_6mon - 0 or 1 value column, 1 if total_elg_mon >= 6
- elg_cont_6mon - 0 or 1 value column, 1 if patient has 6
continuous eligible months
- mas_elg_change - 0 or 1 value column, 1 if patient had
multiple mas group memberships during claim year
- mas_assignments - comma separated list of MAS assignments
- boe_assignments - comma separated list of BOE assignments
- dominant_boe_group - BOE status held for the most number of
months
- boe_elg_change - 0 or 1 value column, 1 if patient had
multiple boe group memberships during claim year
- child_boe_elg_change - 0 or 1 value column, 1 if patient had
multiple boe group memberships during claim year
- elg_change - 0 or 1 value column, 1 if patient had multiple
eligibility group memberships during claim year
- eligibility_aged - Eligibility as aged anytime during the
claim year
- eligibility_child - Eligibility as child anytime during the
claim year
- max_gap - Maximum gap in enrollment in months
- max_cont_enrollment - Maximum duration of continuous enrollment
"""
# MAS & BOE groups are arrived at from MAX_ELG_CD variable
# (https://resdac.org/sites/datadocumentation.resdac.org/files/MAX%20UNIFORM%20ELIGIBILITY%20CODE%20TABLE.txt)
# FARA year 2 peds paper decided to collapse BOE assignments,
# combining disabled and child groups
def _assign_mas(pdf):
mas_cols = {}
for mon in range(1, 13):
mas_cols[f"MAS_ELG_MON_{mon}"] = (
pdf[f"MAX_ELG_CD_MO_{mon}"]
.where(
~(
pdf[f"MAX_ELG_CD_MO_{mon}"]
.str.strip()
.isin(["00", "99", "", "."])
| pdf[f"MAX_ELG_CD_MO_{mon}"].isna()
),
"99",
)
.astype(str)
.str.strip()
.str[:1]
)
pdf = pdf.assign(**mas_cols)
mas_df = pdf[[f"MAS_ELG_MON_{mon}" for mon in range(1, 13)]]
return pdf.assign(
mas_elg_change=(
mas_df.replace("9", np.nan)
.nunique(axis=1, dropna=True)
> 1
).astype(int),
mas_assignments=mas_df.replace("9", "").apply(
lambda x: ",".join(set(",".join(x).split(","))),
axis=1,
),
)
self.df = self.df.map_partitions(_assign_mas)
def _assign_boe(pdf):
boe_cols = {}
child_boe_cols = {}
for mon in range(1, 13):
# Convert once per month, reuse for all conditions
elg_str = pdf[f"MAX_ELG_CD_MO_{mon}"].astype(str)
boe_cols[f"BOE_ELG_MON_{mon}"] = np.select(
[
elg_str.isin(["11", "21", "31", "41"]), # aged
elg_str.isin(["12", "22", "32", "42"]),
# blind / disabled
elg_str.isin(
["14", "16", "24", "34", "44", "48"]
), # child
elg_str.isin(
["15", "17", "25", "35", "3A", "45"]
), # adult
elg_str.isin(["51", "52", "54", "55"]),
],
# state demonstration EXPANSION
[1, 2, 3, 4, 5],
default=6,
)
child_boe_cols[f"CHILD_BOE_ELG_MON_{mon}"] = np.select(
[
elg_str.isin(["11", "21", "31", "41"]), # aged
elg_str.isin(
[
"12", "22", "32", "42",
"14", "16", "24", "34", "44", "48",
]
),
# blind / disabled OR CHILD
elg_str.isin(
["15", "17", "25", "35", "3A", "45"]
), # adult
elg_str.isin(["51", "52", "54", "55"]),
],
# state demonstration EXPANSION
[1, 2, 3, 4],
default=5,
)
pdf = pdf.assign(**boe_cols, **child_boe_cols)
boe_df = pdf[[f"BOE_ELG_MON_{m}" for m in range(1, 13)]]
child_boe_df = pdf[
[f"CHILD_BOE_ELG_MON_{m}" for m in range(1, 13)]
]
return pdf.assign(
boe_elg_change=(
boe_df.replace(6, np.nan)
.nunique(axis=1, dropna=True)
> 1
).astype(int),
child_boe_elg_change=(
child_boe_df.replace(5, np.nan)
.nunique(axis=1, dropna=True)
> 1
).astype(int),
boe_assignments=boe_df.astype(str).apply(
lambda x: ",".join(set(",".join(x).split(","))),
axis=1,
),
dominant_boe_grp=boe_df.replace(6, np.nan)
.assign(empty_boe_elg=6)
.mode(axis=1, dropna=True)[0]
.fillna(6)
.astype(int),
)
self.df = self.df.map_partitions(_assign_boe)
self.df = self.df.assign(
elg_change=(
self.df[["boe_elg_change", "mas_elg_change"]].sum(axis=1) > 0
).astype(int),
eligibility_aged=(
self.df[[f"BOE_ELG_MON_{mon}" for mon in range(1, 13)]] == 1
).any(axis="columns"),
eligibility_child=(
self.df[[f"BOE_ELG_MON_{mon}" for mon in range(1, 13)]] == 3
).any(axis="columns"),
)
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
**{
f"elg_mon_{mon}": (
~(
pdf[f"MAX_ELG_CD_MO_{mon}"]
.str.strip()
.isin(["00", "99", "", "."])
| pdf[f"MAX_ELG_CD_MO_{mon}"].isna()
)
).astype(int)
for mon in range(1, 13)
}
)
)
total_elg = self.df[[f"elg_mon_{i}" for i in range(1, 13)]].sum(
axis=1
)
self.df = self.df.assign(
total_elg_mon=total_elg,
elg_full_year=(total_elg == 12).astype(int),
elg_over_9mon=(total_elg >= 9).astype(int),
elg_over_6mon=(total_elg >= 6).astype(int),
)
def _compute_gaps_and_enrollment(pdf):
elg_str = (
pdf[[f"elg_mon_{mon}" for mon in range(1, 13)]]
.astype(int)
.astype(str)
)
joined = elg_str.apply("".join, axis=1)
return pdf.assign(
max_gap=joined.str.split("1").apply(
lambda parts: max(len(p) for p in parts)
),
max_cont_enrollment=joined.str.split("0").apply(
lambda parts: max(len(p) for p in parts)
),
)
self.df = self.df.map_partitions(_compute_gaps_and_enrollment)
self.df = self.df.assign(
elg_cont_6mon=(self.df["max_cont_enrollment"] >= 6).astype(int)
)
lst_cols_to_delete = [f"MAS_ELG_MON_{mon}" for mon in range(1, 13)] + [
f"BOE_ELG_MON_{mon}" for mon in range(1, 13)
]
self.df = self.df.drop(lst_cols_to_delete, axis=1)
[docs]
def flag_restricted_benefits(self) -> None:
"""
Checks individual's eligibility for various medicaid services,
based on EL_RSTRCT_BNFT_FLG_{month} values,
- 1 = full scope; INDIVIDUAL IS ELIGIBLE FOR MEDICAID DURING THE
MONTH AND IS ENTITLED TO THE FULL SCOPE OF MEDICAID BENEFITS.
- 2 = alien; INDIVIDUAL IS ELIGIBLE FOR MEDICAID DURING THE
MONTH BUT ONLY ENTITLED TO RESTRICTED BENEFITS
BASED ON ALIEN STATUS
- 3 = dual
- 4 = pregnancy
- 5 = other, eg. substance abuse, medically needy
- 6 = family planning
- 7 = alternative package of benchmark equivalent coverage,
2011 data had no values of 7 and 8
- 8 = "money follows the person" rebalancing demonstration,
2011 data had no values of 7 and 8
- 9 = unknown
- A = Psychiatric residential treatments demonstration
- B = Health Opportunity Account
- C = CHIP dental coverage, supplemental to employer sponsored
insurance
- W = Medicaid health insurance premium payment assistance (MA,
NJ, VT, OK)
- X = rx drug
- Y = drug and dual
- Z = drug and dual, but Medicaid was not paying for the benefits.
Benefits are non-comprehensive (restricted) when
EL_RSTRCT_BNFT_FLG_{month} has any of the below values:
- "2", "3", "6": for states other than "AR", "ID", "SD"
- "2", "4", "3", "6": for states "AR", "ID", "SD"
New column(s):
- any_restricted_benefit_month: 0 or 1, 1 when bene's benefits
are restricted for atleast 1 month
- restricted_benefit_months: Number of restricted benefit months
- restricted_benefits: 0 or 1, 1 when number of restricted
benefit months are more than the number of number of months the
bene was enrolled in medicaid
"""
lst_excluded_restricted_benefit_code = (
["2", "3", "6"]
if self.state not in ["AR", "ID", "SD"]
else ["2", "4", "3", "6"]
)
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
any_restricted_benefit_month=np.column_stack(
[
pdf[col]
.fillna("0")
.str.strip()
.isin(lst_excluded_restricted_benefit_code)
for col in [
f"EL_RSTRCT_BNFT_FLG_{str(mon)}"
for mon in range(1, 13)
]
]
)
.any(axis=1)
.astype(int),
restricted_benefit_months=pd.DataFrame(
np.column_stack(
[
pdf[col]
.fillna("0")
.str.strip()
.isin(lst_excluded_restricted_benefit_code)
.astype(int)
.astype(str)
for col in [
f"EL_RSTRCT_BNFT_FLG_{str(mon)}"
for mon in range(1, 13)
]
]
)
)
.agg("".join, axis=1)
.astype(str),
)
)
self.df = self.df.assign(
restricted_benefits=(
self.df["restricted_benefit_months"]
.str.replace("0", "")
.str.len()
> (12 - self.df["total_elg_mon"])
).astype(int)
)
[docs]
def flag_tanf(self) -> None:
"""
The Temporary Assistance for Needy Families (TANF) program provides
temporary financial assistance for pregnant women and families with
one or more dependent children. This provides financial assistance to
help pay for food, shelter, utilities, and expenses other than
medical. In MAX files this is identified via
EL_TANF_CASH_FLG:
- 1 = INDIVIDUAL DID NOT RECEIVE TANF BENEFITS DURING THE MONTH;
- 2 = INDIVIDUAL DID RECEIVE TANF BENEFITS DURING THE MONTH. CO
and ID either 0 or 9
New Column(s):
- tanf : 0 or 1, denoting usage of TANF benefits in any of the
months
"""
self.df = self.df.map_partitions(
lambda pdf: pdf.assign(
tanf=np.column_stack(
[
pd.to_numeric(pdf[col], errors="coerce").isin(
[2] if self.state not in ["CO", "ID"] else [0, 9]
)
for col in [
f"EL_TANF_CASH_FLG_{str(mon)}"
for mon in range(1, 13)
]
]
)
.any(axis=1)
.astype(int)
)
)