import glob
import logging
import os
import re
from datetime import datetime
from math import ceil
import dask.dataframe as dd
import numpy as np
import pandas as pd
import requests
import usaddress # pylint: disable=import-error
from fuzzywuzzy import process, fuzz # pylint: disable=import-error
from medicaid_utils.common_utils.usps_address import USPSAddress
hcris_lookup_folder = os.path.join(os.path.dirname(__file__), "data", "hcris")
nppes_lookup_folder = os.path.join(os.path.dirname(__file__), "data", "nppes")
fqhc_lookup_folder = os.path.join(os.path.dirname(__file__), "data", "fqhc")
pq_engine = "pyarrow"
# NPI Lookup params
npi_api = "https://npiregistry.cms.hhs.gov/api/?"
dct_params = {"enumeration_type": "NPI-2", "version": 2.1, "limit": 100}
[docs]
def combine_and_clean_hcris_files(logger_name):
"""Combines HCRIS provider info files and standardizes column names
Provider id is cleaned by removing non-alphanumeric columns.
Additional version of provider id without leading zeroes is created.
Files are downloaded from multiple years, starting from 2014 when archives are available.
2014: https://web.archive.org/web/20141120072219/http://downloads.cms.gov/Files/hcris/HCLINIC-REPORTS.zip
2015: https://web.archive.org/web/20151230005431/http://downloads.cms.gov/Files/hcris/HCLINIC-REPORTS.zip
2016: https://web.archive.org/web/20161228145429/http://downloads.cms.gov/Files/hcris/HCLINIC-REPORTS.zip
2017: https://web.archive.org/web/20171220152135/http://downloads.cms.gov/Files/hcris/HCLINIC-REPORTS.zip
2018: https://web.archive.org/web/20181226170727/http://downloads.cms.gov/Files/hcris/HCLINIC-REPORTS.zip
2019: https://web.archive.org/web/20191226132614/http://downloads.cms.gov/Files/hcris/FQHC14-REPORTS.zip
2020: https://web.archive.org/web/20201225141117/https://downloads.cms.gov/Files/hcris/FQHC14-REPORTS.zip
2021: https://www.cms.gov/Research-Statistics-Data-and-Systems/
Downloadable-Public-Use-Files/Cost-Reports/FQHC-224-2014-form
"""
logger = logging.getLogger(logger_name)
lst_years = []
dct_col_names = {
"prvdr_num": "hclinic_provider_id",
"fqhc_name": "hclinic_name",
"street_addr": "hclinic_street_address",
"po_box": "hclinic_po_box",
"city": "hclinic_city",
"state": "hclinic_state",
"zip_code": "hclinic_zipcode",
"county": "hclinic_county",
"PROVIDER_NUMBER": "hclinic_provider_id",
"FQHC14_Name": "hclinic_name",
"Street_Addr": "hclinic_street_address",
"PO_Box": "hclinic_po_box",
"City": "hclinic_city",
"State": "hclinic_state",
"Zip_Code": "hclinic_zipcode",
"County": "hclinic_county",
"hclinic_Name": "hclinic_name",
"Po_Box": "hclinic_po_box",
}
lst_col = list(set(dct_col_names.values()))
lstdf_hcris = []
for fname in glob.glob(
os.path.join(hcris_lookup_folder, "hclinic_provider_info_*.csv")
):
year = os.path.splitext(fname)[0][-4:]
lst_years.append(int(year))
logger.info("Processing %s HCRIS dataset", year)
df_hcris = pd.read_csv(fname, dtype=object)
df_hcris = df_hcris.rename(columns=dct_col_names)
df_hcris = df_hcris[lst_col]
df_hcris = df_hcris.assign(
ccn=pd.to_numeric(
df_hcris["hclinic_provider_id"].str.strip().str[-4:],
errors="coerce",
)
)
# Removing Rural Health Centers
df_hcris = df_hcris.loc[
df_hcris["ccn"].between(1800, 1989, inclusive="both")
| df_hcris["ccn"].between(1000, 1199, inclusive="both")
]
df_hcris = df_hcris.drop(["ccn"], axis=1)
lstdf_hcris.append(df_hcris)
df_hcris = (
pd.concat(lstdf_hcris, ignore_index=True)
.reset_index(drop=True)
.drop_duplicates()
)
# Cleaning provider_id column by removing non-alphanumeric characters
df_hcris = df_hcris.assign(
hclinic_provider_id_cleaned=df_hcris["hclinic_provider_id"]
.str.replace("[^a-zA-Z0-9]+", "", regex=True)
.str.upper()
)
# Cleaning provider_id column by removing non-alphanumeric characters & leading zeros
df_hcris = df_hcris.assign(
hclinic_provider_id_no_leading_zero=df_hcris[
"hclinic_provider_id_cleaned"
].str.lstrip("0")
)
fname_pickle = os.path.join(hcris_lookup_folder, "hcris_all_years.pickle")
fname_csv = os.path.join(
hcris_lookup_folder, f"hcris_{min(lst_years)}_{max(lst_years)}.csv"
)
logger.info(
"%s HCRIS datasets were merged and cleaned, and is saved at %s & %s",
','.join(str(yr) for yr in lst_years),
fname_pickle,
fname_csv,
)
df_hcris.to_csv(fname_csv, index=False)
df_hcris.to_pickle(fname_pickle)
[docs]
def clean_hclinic_name(hclinic_name):
return re.sub(" +", " ", re.sub("[^a-zA-Z0-9 ]+", "", hclinic_name))
[docs]
def clean_zip(zipcode):
return str(zipcode).replace("-", "")
[docs]
def get_taxonomies(lstdct_tax):
return ",".join(dct_tax["code"] for dct_tax in lstdct_tax)
[docs]
def filter_partial_matches(df):
"""Filter source x nppes matches that have some similarity between their names and address, with a distance score
of at least 60"""
df = df.replace(to_replace=[None, "None"], value=np.nan).fillna("")
df = df.assign(
**{
col: df[col].astype(str)
for col in [
"provider_id_state",
"p_loc_line_1",
"p_loc_line_2",
"p_loc_city",
"p_loc_state",
"p_mail_line_1",
"p_mail_line_2",
"p_mail_city",
"p_mail_state",
"hclinic_name",
"hclinic_street_address",
"hclinic_city",
"hclinic_state",
"hclinic_business_address",
"hclinic_business_city",
"hclinic_business_state",
"hclinic_mail_address",
"hclinic_mail_city",
"hclinic_mail_state",
]
}
)
df = df.map_partitions(compute_basic_match_purity)
df = df.loc[
(df[["name_score", "address_score"]] > 60).any(axis=1)
& (df["city_score"] >= 70)
].compute()
return df
[docs]
def nppes_api_requests(pdf, logger_name, **dct_request_params):
"""Make NPPES API calls using hclinic name, city, state & zipcode in the request"""
logger = logging.getLogger(logger_name)
# NPI Lookup params
npi_api = "https://npiregistry.cms.hhs.gov/api/?"
dct_params = {"enumeration_type": "NPI-2", "version": 2.1, "limit": 100}
# Using hclinic name, city, state and zipcode in the API call
df_api_based = dd.from_pandas(pdf, npartitions=30)
while True:
try:
df_api_based = df_api_based.map_partitions(
lambda pdf: pdf.assign(
npi_json=pdf.apply(
lambda row: requests.get(
npi_api,
params={
**dct_params,
**{
"organization_name": clean_hclinic_name(
row[
dct_request_params[
"organization_name"
]
]
),
"city": row[dct_request_params["city"]],
"state": row[dct_request_params["state"]],
"postal_code": clean_zip(
row[dct_request_params["postal_code"]]
),
},
},
timeout=30,
).json(),
axis=1,
)
)
).compute()
break
except Exception: # pylint: disable=broad-exception-caught
logger.info("Retrying API calls")
continue
df_api_based = df_api_based.reset_index(drop=True)
df_api_based = df_api_based.merge(
df_api_based["npi_json"].apply(pd.Series)[["result_count", "results"]],
left_index=True,
right_index=True,
)
df_api_based_relaxed_city = df_api_based.loc[
df_api_based["result_count"] < 1
]
df_api_based_matched = df_api_based.loc[df_api_based["result_count"] >= 1]
logger.info(
"%s HCLINIC providers were matched using "
"NPPES API calls with %s, %s, %s and %s ",
df_api_based_matched.hclinic_provider_id_cleaned.nunique(),
dct_request_params['organization_name'],
dct_request_params['city'],
dct_request_params['state'],
dct_request_params['postal_code'],
)
# Using hclinic name, state and zipcode in the API call
df_api_based_relaxed_city = dd.from_pandas(
df_api_based_relaxed_city, npartitions=30
)
while True:
try:
df_api_based_relaxed_city = df_api_based_relaxed_city.map_partitions(
lambda pdf: pdf.assign(
npi_json=pdf.apply(
lambda row: requests.get(
npi_api,
params={
**dct_params,
**{
"organization_name": clean_hclinic_name(
row[
dct_request_params[
"organization_name"
]
]
),
"state": row[dct_request_params["state"]],
"postal_code": clean_zip(
row[dct_request_params["postal_code"]]
),
},
},
timeout=30,
).json(),
axis=1,
)
)
).compute()
break
except Exception: # pylint: disable=broad-exception-caught
logger.info("Retrying API calls")
continue
df_api_based_relaxed_city = df_api_based_relaxed_city.reset_index(
drop=True
).drop(["result_count", "results"], axis=1)
df_api_based_relaxed_city = df_api_based_relaxed_city.merge(
df_api_based_relaxed_city["npi_json"].apply(pd.Series)[
["result_count", "results"]
],
left_index=True,
right_index=True,
)
df_api_based_relaxed_name = df_api_based_relaxed_city.loc[
df_api_based_relaxed_city["result_count"] < 1
]
df_api_based_relaxed_city = df_api_based_relaxed_city.loc[
df_api_based_relaxed_city["result_count"] >= 1
]
logger.info(
"%s HCLINIC providers were matched using "
"NPPES API calls with %s, %s and %s ",
df_api_based_relaxed_city.hclinic_provider_id_cleaned.nunique(),
dct_request_params['organization_name'],
dct_request_params['state'],
dct_request_params['postal_code'],
)
df_api_based_matched = pd.concat(
[df_api_based_matched, df_api_based_relaxed_city], ignore_index=True
).reset_index(drop=True)
# Using hclinic name (starting with), state and zipcode in the API call
df_api_based_relaxed_name = dd.from_pandas(
df_api_based_relaxed_name, npartitions=30
)
while True:
try:
df_api_based_relaxed_name = df_api_based_relaxed_name.map_partitions(
lambda pdf: pdf.assign(
npi_json=pdf.apply(
lambda row: requests.get(
npi_api,
params={
**dct_params,
**{
"organization_name": clean_hclinic_name(
row[
dct_request_params[
"organization_name"
]
]
)
+ "*",
"state": row[dct_request_params["state"]],
"postal_code": clean_zip(
row[dct_request_params["postal_code"]]
),
},
},
timeout=30,
).json(),
axis=1,
)
)
).compute()
break
except Exception: # pylint: disable=broad-exception-caught
logger.info("Retrying API calls")
continue
df_api_based_relaxed_name = df_api_based_relaxed_name.reset_index(
drop=True
).drop(["result_count", "results"], axis=1)
df_api_based_relaxed_name = df_api_based_relaxed_name.merge(
df_api_based_relaxed_name["npi_json"].apply(pd.Series)[
["result_count", "results"]
],
left_index=True,
right_index=True,
)
df_api_based_relaxed_name = df_api_based_relaxed_name.loc[
df_api_based_relaxed_name["result_count"] >= 1
]
logger.info(
"%s HCLINIC providers were matched using "
"NPPES API calls with "
"%s (starting with), %s "
"and %s",
df_api_based_relaxed_name.hclinic_provider_id_cleaned.nunique(),
dct_request_params['organization_name'],
dct_request_params['state'],
dct_request_params['postal_code'],
)
df_api_based_matched = pd.concat(
[df_api_based_matched, df_api_based_relaxed_name], ignore_index=True
).reset_index(drop=True)
return df_api_based_matched
[docs]
def standardize_addresses(pdf):
"""Standardize address using USPS API"""
# sleep_time = 0 if (datetime.today().weekday() in [5, 6]) else 0.12
def parse_address(address, city, state, zipcode):
"""Make calls to USPS API
New columns:
standardized_address - Address returned by USPS API
standardized - 0 or 1, indicates successful standardization
address_error - Error text returned by USPS API, if any
"""
zipcode = (
zipcode if len(zipcode) <= 5 else zipcode[:4] + "-" + zipcode[5:]
)
usps_address = USPSAddress(
name="", street=address, city=city, state=state, zip4=zipcode
)
standardized_address = ""
standardized_street_address = ""
address_error = ""
try:
standardized_address = usps_address.standardized()
standardized_street_address = street_address(standardized_address)
address_error = usps_address._error
except Exception: # pylint: disable=broad-exception-caught
address_error = "API call error"
standardized = int(bool(standardized_address))
# sleep(sleep_time)
return [
standardized_address,
standardized_street_address,
standardized,
address_error,
]
lst_standardized_address_col = [
"standardized_address",
"standardized_street_address",
"standardized",
"address_error",
]
pdf = pdf.merge(
pdf.apply(
lambda row: pd.Series(
parse_address(
row["address_cleaned"],
row["city"],
row["state"],
row["zip"],
),
index=lst_standardized_address_col,
),
axis=1,
result_type="expand",
),
left_index=True,
right_index=True,
)
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
return pdf
[docs]
def flatten_nppes_query_result(pdf):
def json_to_list(
lstdct_match,
target_name,
target_name2,
target_address,
target_city,
target_state,
_lst_priority_taxonomies=None,
):
lst_res = [
expand_query_result(
dct_res,
target_name,
target_name2,
target_address,
target_city,
target_state,
)
for dct_res in lstdct_match
]
return lst_res
def expand_query_result( # pylint: disable=too-many-locals
dct_res,
target_name,
target_name2,
target_address,
target_city,
target_state,
):
fqhc = 0
rural = 0
npi = dct_res["number"]
lst_name = (
[dct_res["basic"]["name"]] if "name" in dct_res["basic"] else []
)
enumeration_date = (
dct_res["basic"]["enumeration_date"]
if "enumeration_date" in dct_res["basic"]
else ""
)
lst_name = (
lst_name + [dct_res["basic"]["organization_name"]]
if ("organization_name" in dct_res["basic"])
else []
)
if "other_names" in dct_res:
for dct_oth in dct_res["other_names"]:
lst_name = (
lst_name + [dct_oth["organization_name"]]
if ("organization_name" in dct_oth)
else []
)
orgname = (
process.extractOne(target_name, lst_name, scorer=fuzz.ratio)
if (len(lst_name) > 0)
else None
)
orgnameoth = ""
if orgname is not None:
orgname = orgname[0]
lst_othname = [n for n in lst_name if n != orgname]
orgnameoth = (
process.extractOne(
target_name2 if pd.notnull(target_name2) else target_name,
lst_othname,
scorer=fuzz.ratio,
)
if (len(lst_othname) > 0)
else None
)
if orgnameoth is not None:
orgnameoth = orgnameoth[0]
lstdct_address = (
dct_res["addresses"] if ("addresses" in dct_res) else []
)
lstdct_address.extend(
dct_res["practiceLocations"]
if ("practiceLocations" in dct_res)
else []
)
lst_addresses = [
dct_adrs["address_1"]
+ " "
+ dct_adrs["address_2"]
+ " "
+ dct_adrs["city"]
+ " "
+ dct_adrs["state"]
for dct_adrs in lstdct_address
]
target_address_full = (
(target_address if pd.notnull(target_address) else "")
+ " "
+ (target_city if pd.notnull(target_city) else "")
+ " "
+ (target_state if pd.notnull(target_state) else "")
)
address = (
process.extractOne(
target_address_full, lst_addresses, scorer=fuzz.ratio
)
if (len(lst_addresses) > 0)
else None
)
plocline1 = (
plocline2
) = (
pmailline1
) = (
pmailline2
) = (
ploccityname
) = (
pmailcityname
) = plocstatename = pmailstatename = ploczip = pmailzip = ""
if address is not None:
address = address[0]
adr_id = lst_addresses.index(address)
plocline1 = lstdct_address[adr_id]["address_1"]
plocline2 = lstdct_address[adr_id]["address_2"]
ploccityname = lstdct_address[adr_id]["city"]
plocstatename = lstdct_address[adr_id]["state"]
ploczip = lstdct_address[adr_id]["postal_code"]
lst_other_address = [
adr for adr in lst_addresses if adr != address
]
othaddress = (
process.extractOne(
target_address_full, lst_other_address, scorer=fuzz.ratio
)
if (len(lst_other_address) > 0)
else None
)
if othaddress is not None:
othaddress = othaddress[0]
othadr_id = lst_addresses.index(othaddress)
pmailline1 = lstdct_address[othadr_id]["address_1"]
pmailline2 = lstdct_address[othadr_id]["address_2"]
pmailcityname = lstdct_address[othadr_id]["city"]
pmailstatename = lstdct_address[othadr_id]["state"]
pmailzip = lstdct_address[othadr_id]["postal_code"]
lst_tax = [dcttax["code"] for dcttax in dct_res["taxonomies"]]
if "261QF0400X" in lst_tax:
fqhc = 1
if "261QR1300X" in lst_tax:
rural = 1
return (
npi,
enumeration_date,
orgname,
orgnameoth,
plocline1,
plocline2,
ploccityname,
plocstatename,
ploczip,
pmailline1,
pmailline2,
pmailcityname,
pmailstatename,
pmailzip,
fqhc,
rural,
)
pdf = pdf.assign(
lst_matches=pdf.apply(
lambda row: json_to_list(
row["results"],
row["hclinic_name"],
np.nan,
row["hclinic_street_address"],
row["hclinic_city"],
row["hclinic_state"],
),
axis=1,
)
)
pdf = pdf.explode("lst_matches").reset_index(drop=True)
pdf = pdf.merge(
pdf["lst_matches"].apply(
pd.Series,
index=[
"npi",
"enumeration_date",
"p_org_name",
"p_org_name_oth",
"p_loc_line_1",
"p_loc_line_2",
"p_loc_city",
"p_loc_state",
"p_loc_zip",
"p_mail_line_1",
"p_mail_line_2",
"p_mail_city",
"p_mail_state",
"p_mail_zip",
"fqhc",
"rural",
],
),
left_index=True,
right_index=True,
)
pdf = pdf.drop(["lst_matches"], axis=1)
return pdf
[docs]
def clean_address(address):
"""Cleaning address columns by removing placenames in street address lines and "
"concatenating multiple occupancy identifier/ street numbers"""
dct_address = dict(usaddress.parse(address))
dct_address_cleaned = {}
for k, v in dct_address.items():
if (v == "OccupancyIdentifier" and k.lower().strip() == "and") or (
v in ["NotAddress", "PlaceName", "Recipient", "StateName"]
):
continue
dct_address_cleaned[v] = (
k
if v not in dct_address_cleaned
else (
"-"
if v.endswith(
(
"Identifier",
"Number",
)
)
else " "
).join([dct_address_cleaned[v], k])
)
return " ".join(
[v.strip() for v in dct_address_cleaned.values()]
)
[docs]
def street_address(address):
"""Strips placenames, state and city names and returns street address component"""
dct_address = dict(usaddress.parse(address))
dct_address_cleaned = {}
for k, v in dct_address.items():
if (v == "OccupancyIdentifier" and k.lower().strip() == "and") or (
v
in ["NotAddress", "PlaceName", "Recipient", "StateName", "ZipCode"]
):
continue
dct_address_cleaned[v] = (
k
if v not in dct_address_cleaned
else (
"-"
if v.endswith(
(
"Identifier",
"Number",
)
)
else " "
).join([dct_address_cleaned[v], k])
)
return " ".join(
[v.strip() for v in dct_address_cleaned.values()]
)
[docs]
def compute_basic_match_purity(pdf):
"""Computes levenshtein distance based similarity scores for address, name & city, and equality match for state"""
def match_purity( # pylint: disable=too-many-arguments,too-many-locals
p_org_name,
p_org_name_oth,
provider_id_state,
hclinic_name,
hclinic_site_name,
hclinic_street_address,
hclinic_street_address_line_2,
hclinic_po_box,
hclinic_city,
hclinic_state,
hclinic_business_address,
hclinic_business_city,
hclinic_business_state,
hclinic_mail_address,
hclinic_mail_city,
hclinic_mail_state,
p_loc_line_1,
p_loc_line_2,
p_loc_city,
p_loc_state,
p_mail_line_1,
p_mail_line_2,
p_mail_city,
p_mail_state,
):
hclinic_address = (
(hclinic_street_address + " ").lstrip()
+ (hclinic_po_box + " ").lstrip()
+ (hclinic_street_address_line_2 + " ").lstrip()
)
p_loc_address = (p_loc_line_1 + " ").lstrip() + (
p_loc_line_2 + " "
).lstrip()
p_mail_address = (p_mail_line_1 + " ").lstrip() + (
p_mail_line_2 + " "
).lstrip()
hclinic_name = clean_hclinic_name(hclinic_name)
hclinic_site_name = clean_hclinic_name(hclinic_site_name)
p_org_name = clean_hclinic_name(p_org_name)
p_org_name_oth = clean_hclinic_name(p_org_name_oth)
p_loc_line_2 = clean_hclinic_name(p_loc_line_2)
p_mail_line_2 = clean_hclinic_name(p_mail_line_2)
name_score = (
0
if (not bool(hclinic_name or hclinic_site_name))
else max(
0
if (not bool(p_org_name))
else max(
fuzz.ratio(p_org_name, hclinic_name),
fuzz.ratio(p_org_name, hclinic_site_name),
),
0
if (not bool(p_org_name_oth))
else max(
fuzz.ratio(p_org_name_oth, hclinic_name),
fuzz.ratio(p_org_name_oth, hclinic_site_name),
),
0
if (not bool(p_loc_line_2))
else max(
fuzz.ratio(p_loc_line_2, hclinic_name),
fuzz.ratio(p_loc_line_2, hclinic_site_name),
),
0
if (not bool(p_mail_line_2))
else max(
fuzz.ratio(p_mail_line_2, hclinic_name),
fuzz.ratio(p_mail_line_2, hclinic_site_name),
),
)
)
address_score = (
0
if (
not bool(
hclinic_address
or hclinic_business_address
or hclinic_mail_address
)
)
else max(
0
if (not bool(p_loc_address))
else max(
fuzz.ratio(p_loc_address, hclinic_address),
fuzz.ratio(p_loc_address, hclinic_business_address),
fuzz.ratio(p_loc_address, hclinic_mail_address),
),
0
if (not bool(p_mail_address))
else max(
fuzz.ratio(p_mail_address, hclinic_address),
fuzz.ratio(p_mail_address, hclinic_business_address),
fuzz.ratio(p_mail_address, hclinic_mail_address),
),
0
if (not bool(p_loc_line_1))
else max(
fuzz.ratio(p_loc_line_1, hclinic_address),
fuzz.ratio(p_loc_line_1, hclinic_business_address),
fuzz.ratio(p_loc_line_1, hclinic_mail_address),
),
0
if (not bool(p_mail_line_1))
else max(
fuzz.ratio(p_mail_line_1, hclinic_address),
fuzz.ratio(p_mail_line_1, hclinic_business_address),
fuzz.ratio(p_mail_line_1, hclinic_mail_address),
),
)
)
city_score = (
0
if (
not bool(
hclinic_city or hclinic_business_city or hclinic_mail_city
)
)
else max(
0
if not bool(p_loc_city)
else max(
fuzz.ratio(p_loc_city, hclinic_city),
fuzz.ratio(p_loc_city, hclinic_business_city),
fuzz.ratio(p_loc_city, hclinic_mail_city),
),
0
if not bool(p_mail_city)
else max(
fuzz.ratio(p_mail_city, hclinic_city),
fuzz.ratio(p_mail_city, hclinic_business_city),
fuzz.ratio(p_mail_city, hclinic_mail_city),
),
)
)
state_match = int(
(
bool(hclinic_state)
and (
hclinic_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
or (
bool(hclinic_business_state)
and (
hclinic_business_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
or (
bool(hclinic_mail_state)
and (
hclinic_mail_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
)
return [name_score, address_score, city_score, state_match]
if "provider_id_state" not in pdf.columns:
pdf["provider_id_state"] = ""
pdf = pdf.assign()
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
pdf = pdf.assign(
**{
col: pdf[col].str.strip().astype(str)
for col in [
"provider_id_state",
"p_loc_line_1",
"p_loc_line_2",
"p_loc_city",
"p_loc_state",
"p_mail_line_1",
"p_mail_line_2",
"p_mail_city",
"p_mail_state",
"hclinic_name",
"hclinic_street_address",
"hclinic_po_box",
"hclinic_city",
"hclinic_state",
"hclinic_site_name",
"hclinic_street_address_line_2",
"hclinic_business_address",
"hclinic_business_state",
"hclinic_business_city",
"hclinic_business_zipcode",
"hclinic_mail_address",
"hclinic_mail_state",
"hclinic_mail_city",
"hclinic_mail_zipcode",
]
}
)
pdf = pdf.merge(
pdf.apply(
lambda row: pd.Series(
match_purity(
row["p_org_name"],
row["p_org_name_oth"],
row["provider_id_state"],
row["hclinic_name"],
row["hclinic_site_name"],
row["hclinic_street_address"],
row["hclinic_street_address_line_2"],
row["hclinic_po_box"],
row["hclinic_city"],
row["hclinic_state"],
row["hclinic_business_address"],
row["hclinic_business_city"],
row["hclinic_business_state"],
row["hclinic_mail_address"],
row["hclinic_mail_city"],
row["hclinic_mail_state"],
row["p_loc_line_1"],
row["p_loc_line_2"],
row["p_loc_city"],
row["p_loc_state"],
row["p_mail_line_1"],
row["p_mail_line_2"],
row["p_mail_city"],
row["p_mail_state"],
),
index=[
"name_score",
"address_score",
"city_score",
"state_match",
],
),
axis=1,
result_type="expand",
),
left_index=True,
right_index=True,
)
return pdf
[docs]
def process_address_columns(pdf, logger_name, source="hcris"):
"""Combines street level components; concatenates multiple values belonging to the same address component type,
such as multiple street or occupancy identifier numbers"""
logger = logging.getLogger(logger_name)
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
pdf = pdf.assign(
p_loc_address=(pdf["p_loc_line_1"] + " ").str.lstrip()
+ (pdf["p_loc_line_2"] + " ").str.lstrip(),
p_mail_address=(pdf["p_mail_line_1"] + " ").str.lstrip()
+ (pdf["p_mail_line_2"] + " ").str.lstrip(),
)
if source == "hcris":
pdf = pdf.assign(
hclinic_address=(pdf["hclinic_street_address"] + " ").str.lstrip()
+ (pdf["hclinic_po_box"] + " ").str.lstrip(),
hclinic_business_address="",
hclinic_mail_address="",
)
if source == "uds":
pdf = pdf.assign(
hclinic_address=(pdf["hclinic_street_address"] + " ").str.lstrip()
+ (pdf["hclinic_street_address_line_2"] + " ").str.lstrip(),
hclinic_business_address=pdf[
"hclinic_business_address"
].str.strip(),
hclinic_mail_address=pdf["hclinic_mail_address"].str.strip(),
)
logger.info(
"Cleaning address columns by removing placenames in street address lines and "
"concatenating multiple occupancy identifier/ street numbers"
)
pdf = (
dd.from_pandas(pdf, npartitions=ceil(pdf.shape[0] / 10000))
.map_partitions(
lambda pdf_partition: pdf_partition.assign(
**{
f"{col}_cleaned": pdf_partition[col].apply(clean_address)
for col in [
"hclinic_address",
"hclinic_business_address",
"hclinic_mail_address",
"p_loc_address",
"p_mail_address",
]
}
)
)
.compute()
)
logger.info("Get numeric components from address fields")
pdf = (
dd.from_pandas(pdf, npartitions=ceil(pdf.shape[0] / 10000))
.map_partitions(
lambda pdf_partition: pdf_partition.assign(
**{
f"{col}_digits": pdf_partition[f"{col}_cleaned"]
.str.findall(r"(\d+)")
.str.join("")
for col in [
"hclinic_address",
"hclinic_business_address",
"hclinic_mail_address",
"p_loc_address",
"p_mail_address",
]
}
)
)
.compute()
)
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
# Combining addresses to prepare for standardization
pdf_hclinic_addresses = pdf[
[
"hclinic_address_cleaned",
"hclinic_city",
"hclinic_state",
"hclinic_zipcode",
]
]
pdf_hclinic_addresses = pdf_hclinic_addresses.rename(
columns={
col: col.replace("hclinic_", "").replace("code", "")
for col in pdf_hclinic_addresses
}
)
if source == "uds":
pdf_hclinic_business_addresses = pdf[
[
"hclinic_business_address_cleaned",
"hclinic_business_city",
"hclinic_business_state",
"hclinic_business_zipcode",
]
].drop_duplicates()
pdf_hclinic_business_addresses = pdf_hclinic_business_addresses.rename(
columns={
col: "_".join(col.split("_")[2:]).replace("code", "")
for col in pdf_hclinic_business_addresses
}
)
pdf_hclinic_mail_addresses = pdf[
[
"hclinic_mail_address_cleaned",
"hclinic_mail_city",
"hclinic_mail_state",
"hclinic_mail_zipcode",
]
].drop_duplicates()
pdf_hclinic_mail_addresses = pdf_hclinic_mail_addresses.rename(
columns={
col: "_".join(col.split("_")[2:]).replace("code", "")
for col in pdf_hclinic_mail_addresses
}
)
pdf_hclinic_addresses = (
pd.concat(
[
pdf_hclinic_addresses,
pdf_hclinic_business_addresses,
pdf_hclinic_mail_addresses,
],
ignore_index=True,
)
.drop_duplicates()
.reset_index(drop=True)
)
pdf_loc_addresses = pdf[
["p_loc_address_cleaned", "p_loc_city", "p_loc_state", "p_loc_zip"]
]
pdf_loc_addresses = pdf_loc_addresses.rename(
columns={col: col.replace("p_loc_", "") for col in pdf_loc_addresses}
)
pdf_mail_addresses = pdf[
["p_mail_address_cleaned", "p_mail_city", "p_mail_state", "p_mail_zip"]
]
pdf_mail_addresses = pdf_mail_addresses.rename(
columns={col: col.replace("p_mail_", "") for col in pdf_mail_addresses}
)
pdf_addresses = (
pd.concat(
[pdf_hclinic_addresses, pdf_loc_addresses, pdf_mail_addresses],
ignore_index=True,
)
.drop_duplicates()
.reset_index(drop=True)
)
lst_address_col = list(pdf_addresses.columns)
fname_standardized_address = os.path.join(
fqhc_lookup_folder, "standardized_addresses.pickle"
)
pdf_matched = pd.DataFrame(columns=lst_address_col)
if os.path.exists(fname_standardized_address):
pdf_addresses = pdf_addresses.merge(
pd.read_pickle(fname_standardized_address),
on=lst_address_col,
how="left",
)
pdf_matched = pdf_addresses.loc[
pdf_addresses["standardized"].isin([0, 1])
]
pdf_addresses = pdf_addresses.loc[
~pdf_addresses["standardized"].isin([0, 1])
][lst_address_col]
logger.info(
"%s addresses standardized using existing standardized addresses list",
pdf_matched.shape[0],
)
logger.info("Standardize address using USPS API")
if pdf_addresses.shape[0] > 0:
pdf_addresses = (
dd.from_pandas(
pdf_addresses, npartitions=ceil(pdf_addresses.shape[0] / 10000)
)
.map_partitions(standardize_addresses)
.compute()
)
logger.info(
"%s addresses standardized using USPS API",
pdf_addresses.shape[0],
)
if pdf_matched.shape[0] > 0:
pdf_addresses = pd.concat(
[pdf_matched, pdf_addresses], ignore_index=True
).reset_index(drop=True)
# Saving standardized addresses
pd.concat(
[pd.read_pickle(fname_standardized_address), pdf_addresses],
ignore_index=True,
).sort_values(["standardized"], ascending=False).drop_duplicates(
lst_address_col
).reset_index(
drop=True
).to_pickle(
fname_standardized_address
)
logger.info(
"Standardized address list is saved at %s",
fname_standardized_address,
)
else:
pdf_addresses = pdf_matched
pdf = (
pdf.merge(
pdf_addresses.rename(
columns={
col: f'hclinic_{col.replace("zip", "zipcode")}'
for col in pdf_addresses.columns
}
),
on=[
f'hclinic_{col.replace("zip", "zipcode")}'
for col in ["address_cleaned", "city", "state", "zip"]
],
how="left",
)
.merge(
pdf_addresses.rename(
columns={col: f"p_loc_{col}" for col in pdf_addresses.columns}
),
on=[
f"p_loc_{col}"
for col in ["address_cleaned", "city", "state", "zip"]
],
how="left",
)
.merge(
pdf_addresses.rename(
columns={col: f"p_mail_{col}" for col in pdf_addresses.columns}
),
on=[
f"p_mail_{col}"
for col in ["address_cleaned", "city", "state", "zip"]
],
how="left",
)
.merge(
pdf_addresses.rename(
columns={
col: f'hclinic_business_{col.replace("zip", "zipcode")}'
for col in pdf_addresses.columns
}
),
on=[
f'hclinic_business_{col.replace("zip", "zipcode")}'
for col in ["address_cleaned", "city", "state", "zip"]
],
how="left",
)
.merge(
pdf_addresses.rename(
columns={
col: f'hclinic_mail_{col.replace("zip", "zipcode")}'
for col in pdf_addresses.columns
}
),
on=[
f'hclinic_mail_{col.replace("zip", "zipcode")}'
for col in ["address_cleaned", "city", "state", "zip"]
],
how="left",
)
)
# Create door no columns
logger.info("Computing door number info")
pdf = compute_door_no_info(pdf)
return pdf
[docs]
def compute_match_purity(pdf):
"""Computes levenshtein distance based similarity scores for address & name columns"""
def match_purity( # pylint: disable=too-many-arguments,too-many-locals
p_org_name,
p_org_name_oth,
provider_id_state,
hclinic_name,
hclinic_site_name,
hclinic_address_cleaned,
hclinic_standardized_address,
hclinic_standardized_street_address,
hclinic_standardized,
hclinic_address_digits,
hclinic_post_box_id,
hclinic_post_box_group_id,
hclinic_address_no,
hclinic_occupancy_no,
hclinic_city,
hclinic_state,
hclinic_business_address_cleaned,
hclinic_business_standardized_address,
hclinic_business_standardized_street_address,
hclinic_business_standardized,
hclinic_business_address_digits,
hclinic_business_post_box_id,
hclinic_business_post_box_group_id,
hclinic_business_address_no,
hclinic_business_occupancy_no,
hclinic_business_city,
hclinic_business_state,
hclinic_mail_address_cleaned,
hclinic_mail_standardized_address,
hclinic_mail_standardized_street_address,
hclinic_mail_standardized,
hclinic_mail_address_digits,
hclinic_mail_post_box_id,
hclinic_mail_post_box_group_id,
hclinic_mail_address_no,
hclinic_mail_occupancy_no,
hclinic_mail_city,
hclinic_mail_state,
p_loc_address_cleaned,
p_loc_standardized_address,
p_loc_standardized_street_address,
p_loc_address_digits,
p_loc_post_box_id,
p_loc_post_box_group_id,
p_loc_address_no,
p_loc_occupancy_no,
p_loc_line_1,
p_loc_line_2,
p_loc_city,
p_loc_state,
p_mail_address_cleaned,
p_mail_standardized_address,
p_mail_standardized_street_address,
p_mail_address_digits,
p_mail_post_box_id,
p_mail_post_box_group_id,
p_mail_address_no,
p_mail_occupancy_no,
p_mail_line_1,
p_mail_line_2,
p_mail_city,
p_mail_state,
):
hclinic_name = clean_hclinic_name(hclinic_name)
hclinic_site_name = clean_hclinic_name(hclinic_site_name)
p_org_name = clean_hclinic_name(p_org_name)
p_org_name_oth = clean_hclinic_name(p_org_name_oth)
p_loc_line_2 = clean_hclinic_name(p_loc_line_2)
p_mail_line_2 = clean_hclinic_name(p_mail_line_2)
name_score = (
0
if (not bool(hclinic_name or hclinic_site_name))
else max(
0
if (not bool(p_org_name))
else max(
fuzz.ratio(p_org_name, hclinic_name),
fuzz.ratio(p_org_name, hclinic_site_name),
),
0
if (not bool(p_org_name_oth))
else max(
fuzz.ratio(p_org_name_oth, hclinic_name),
fuzz.ratio(p_org_name_oth, hclinic_site_name),
),
0
if (not bool(p_loc_line_2))
else max(
fuzz.ratio(p_loc_line_2, hclinic_name),
fuzz.ratio(p_loc_line_2, hclinic_site_name),
),
0
if (not bool(p_mail_line_2))
else max(
fuzz.ratio(p_mail_line_2, hclinic_name),
fuzz.ratio(p_mail_line_2, hclinic_site_name),
),
)
)
address_score = (
0
if (
not bool(
hclinic_address_cleaned
or hclinic_business_address_cleaned
or hclinic_mail_address_cleaned
)
)
else max(
0
if (not bool(p_loc_address_cleaned))
else max(
fuzz.ratio(p_loc_address_cleaned, hclinic_address_cleaned),
fuzz.ratio(
p_loc_address_cleaned, hclinic_business_address_cleaned
),
fuzz.ratio(
p_loc_address_cleaned, hclinic_mail_address_cleaned
),
),
0
if (not bool(p_mail_address_cleaned))
else max(
fuzz.ratio(
p_mail_address_cleaned, hclinic_address_cleaned
),
fuzz.ratio(
p_mail_address_cleaned,
hclinic_business_address_cleaned,
),
fuzz.ratio(
p_mail_address_cleaned, hclinic_mail_address_cleaned
),
),
0
if (not bool(p_loc_line_1))
else max(
fuzz.ratio(p_loc_line_1, hclinic_address_cleaned),
fuzz.ratio(p_loc_line_1, hclinic_business_address_cleaned),
fuzz.ratio(p_loc_line_1, hclinic_mail_address_cleaned),
),
0
if (not bool(p_mail_line_1))
else max(
fuzz.ratio(p_mail_line_1, hclinic_address_cleaned),
fuzz.ratio(
p_mail_line_1, hclinic_business_address_cleaned
),
fuzz.ratio(p_mail_line_1, hclinic_mail_address_cleaned),
),
)
)
digits_match = (
0
if (
not bool(
hclinic_address_digits
or hclinic_business_address_digits
or hclinic_mail_address_digits
)
)
else int(
(
bool(hclinic_address_digits)
and (
hclinic_address_digits
in [p_loc_address_digits, p_mail_address_digits]
)
)
or (
bool(hclinic_business_address_digits)
and (
hclinic_business_address_digits
in [p_loc_address_digits, p_mail_address_digits]
)
)
or (
bool(hclinic_mail_address_digits)
and (
hclinic_mail_address_digits
in [p_loc_address_digits, p_mail_address_digits]
)
)
)
)
def match_door_no(
source_post_box_id,
source_post_box_group_id,
source_address_no,
source_occupancy_no,
):
return (
0
if not bool(
source_post_box_id
or source_post_box_group_id
or source_address_no
or source_occupancy_no
)
else int(
(
(bool(source_address_no))
& (
(
(source_address_no == p_loc_address_no)
& (
(source_occupancy_no == p_loc_occupancy_no)
| (
not bool(
source_occupancy_no
or p_loc_occupancy_no
)
)
)
)
| (
(source_address_no == p_mail_address_no)
& (
(
source_occupancy_no
== p_mail_occupancy_no
)
| (
not bool(
source_occupancy_no
or p_mail_occupancy_no
)
)
)
)
)
)
| (
(bool(source_post_box_id))
& (
(
(source_post_box_id == p_loc_post_box_id)
& (
(
source_post_box_group_id
== p_loc_post_box_group_id
)
| (
not bool(
source_post_box_group_id
or p_loc_post_box_group_id
)
)
)
)
| (
(source_post_box_id == p_mail_post_box_id)
& (
(
source_post_box_group_id
== p_mail_post_box_group_id
)
| (
not bool(
source_post_box_group_id
or p_mail_post_box_group_id
)
)
)
)
)
)
)
)
door_no_match = max(
match_door_no(
hclinic_post_box_id,
hclinic_post_box_group_id,
hclinic_address_no,
hclinic_occupancy_no,
),
match_door_no(
hclinic_business_post_box_id,
hclinic_business_post_box_group_id,
hclinic_business_address_no,
hclinic_business_occupancy_no,
),
match_door_no(
hclinic_mail_post_box_id,
hclinic_mail_post_box_group_id,
hclinic_mail_address_no,
hclinic_mail_occupancy_no,
),
)
standardized_street_address_score = (
0
if (
not bool(
hclinic_standardized_street_address
or hclinic_business_standardized_street_address
or hclinic_mail_standardized_street_address
)
)
else max(
0
if (not bool(p_loc_standardized_street_address))
else max(
0
if (not bool(hclinic_standardized))
else fuzz.ratio(
p_loc_standardized_street_address,
hclinic_standardized_street_address,
),
0
if (not bool(hclinic_business_standardized))
else fuzz.ratio(
p_loc_standardized_street_address,
hclinic_business_standardized_street_address,
),
0
if (not bool(hclinic_mail_standardized))
else fuzz.ratio(
p_loc_standardized_street_address,
hclinic_mail_standardized_street_address,
),
),
0
if (not bool(p_mail_standardized_street_address))
else max(
0
if (not bool(hclinic_standardized))
else fuzz.ratio(
p_mail_standardized_street_address,
hclinic_standardized_street_address,
),
0
if (not bool(hclinic_business_standardized))
else fuzz.ratio(
p_mail_standardized_street_address,
hclinic_business_standardized_street_address,
),
0
if (not bool(hclinic_mail_standardized))
else fuzz.ratio(
p_mail_standardized_street_address,
hclinic_mail_standardized_street_address,
),
),
)
)
standardized_address_score = (
0
if (
not bool(
hclinic_standardized_address
or hclinic_business_standardized_address
or hclinic_mail_standardized_address
)
)
else max(
0
if (not bool(p_loc_standardized_address))
else max(
0
if (not bool(hclinic_standardized))
else fuzz.ratio(
p_loc_standardized_address,
hclinic_standardized_address,
),
0
if (not bool(hclinic_business_standardized))
else fuzz.ratio(
p_loc_standardized_address,
hclinic_business_standardized_address,
),
0
if (not bool(hclinic_mail_standardized))
else fuzz.ratio(
p_loc_standardized_address,
hclinic_mail_standardized_address,
),
),
0
if (not bool(p_mail_standardized_address))
else max(
0
if (not bool(hclinic_standardized))
else fuzz.ratio(
p_mail_standardized_address,
hclinic_standardized_address,
),
0
if (not bool(hclinic_business_standardized))
else fuzz.ratio(
p_mail_standardized_address,
hclinic_business_standardized_address,
),
0
if (not bool(hclinic_mail_standardized))
else fuzz.ratio(
p_mail_standardized_address,
hclinic_mail_standardized_address,
),
),
)
)
city_score = (
0
if (
not bool(
hclinic_city or hclinic_business_city or hclinic_mail_city
)
)
else max(
0
if not bool(p_loc_city)
else max(
fuzz.ratio(p_loc_city, hclinic_city),
fuzz.ratio(p_loc_city, hclinic_business_city),
fuzz.ratio(p_loc_city, hclinic_mail_city),
),
0
if not bool(p_mail_city)
else max(
fuzz.ratio(p_mail_city, hclinic_city),
fuzz.ratio(p_mail_city, hclinic_business_city),
fuzz.ratio(p_mail_city, hclinic_mail_city),
),
)
)
state_match = int(
(
bool(hclinic_state)
and (
hclinic_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
or (
bool(hclinic_business_state)
and (
hclinic_business_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
or (
bool(hclinic_mail_state)
and (
hclinic_mail_state
in [provider_id_state, p_loc_state, p_mail_state]
)
)
)
return [
name_score,
address_score,
digits_match,
door_no_match,
standardized_street_address_score,
standardized_address_score,
city_score,
state_match,
]
if "provider_id_state" not in pdf.columns:
pdf["provider_id_state"] = ""
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
pdf = pdf.assign(
**{
col: pdf[col].str.strip()
for col in [
"hclinic_post_box_id",
"hclinic_post_box_group_id",
"hclinic_address_no",
"hclinic_occupancy_no",
"hclinic_business_post_box_id",
"hclinic_business_post_box_group_id",
"hclinic_business_address_no",
"hclinic_business_occupancy_no",
"hclinic_mail_post_box_id",
"hclinic_mail_post_box_group_id",
"hclinic_mail_address_no",
"hclinic_mail_occupancy_no",
"p_loc_post_box_id",
"p_loc_post_box_group_id",
"p_loc_address_no",
"p_loc_occupancy_no",
"p_mail_post_box_id",
"p_mail_post_box_group_id",
"p_mail_address_no",
"p_mail_occupancy_no",
]
}
)
pdf = (
dd.from_pandas(pdf, npartitions=ceil(pdf.shape[0] / 10000))
.map_partitions(
lambda pdf_partition: pdf_partition.merge(
pdf_partition.apply(
lambda row: pd.Series(
match_purity(
row["p_org_name"],
row["p_org_name_oth"],
row["provider_id_state"],
row["hclinic_name"],
row["hclinic_site_name"],
row["hclinic_address_cleaned"],
row["hclinic_standardized_address"],
row["hclinic_standardized_street_address"],
row["hclinic_standardized"],
row["hclinic_address_digits"],
row["hclinic_post_box_id"],
row["hclinic_post_box_group_id"],
row["hclinic_address_no"],
row["hclinic_occupancy_no"],
row["hclinic_city"],
row["hclinic_state"],
row["hclinic_business_address_cleaned"],
row["hclinic_business_standardized_address"],
row[
"hclinic_business_standardized_street_address"
],
row["hclinic_business_standardized"],
row["hclinic_business_address_digits"],
row["hclinic_business_post_box_id"],
row["hclinic_business_post_box_group_id"],
row["hclinic_business_address_no"],
row["hclinic_business_occupancy_no"],
row["hclinic_business_city"],
row["hclinic_business_state"],
row["hclinic_mail_address_cleaned"],
row["hclinic_mail_standardized_address"],
row["hclinic_mail_standardized_street_address"],
row["hclinic_mail_standardized"],
row["hclinic_mail_address_digits"],
row["hclinic_mail_post_box_id"],
row["hclinic_mail_post_box_group_id"],
row["hclinic_mail_address_no"],
row["hclinic_mail_occupancy_no"],
row["hclinic_mail_city"],
row["hclinic_mail_state"],
row["p_loc_address_cleaned"],
row["p_loc_standardized_address"],
row["p_loc_standardized_street_address"],
row["p_loc_address_digits"],
row["p_loc_post_box_id"],
row["p_loc_post_box_group_id"],
row["p_loc_address_no"],
row["p_loc_occupancy_no"],
row["p_loc_line_1"],
row["p_loc_line_2"],
row["p_loc_city"],
row["p_loc_state"],
row["p_mail_address_cleaned"],
row["p_mail_standardized_address"],
row["p_mail_standardized_street_address"],
row["p_mail_address_digits"],
row["p_mail_post_box_id"],
row["p_mail_post_box_group_id"],
row["p_mail_address_no"],
row["p_mail_occupancy_no"],
row["p_mail_line_1"],
row["p_mail_line_2"],
row["p_mail_city"],
row["p_mail_state"],
),
index=[
"name_score",
"address_score",
"digits_match",
"door_no_match",
"standardized_street_address_score",
"standardized_address_score",
"city_score",
"state_match",
],
),
axis=1,
result_type="expand",
),
left_index=True,
right_index=True,
)
)
.compute()
)
return pdf
[docs]
def compute_door_no_info(pdf):
def parse_address( # pylint: disable=too-many-arguments,too-many-locals
hclinic_address_cleaned,
hclinic_standardized_address,
hclinic_standardized,
hclinic_business_address_cleaned,
hclinic_business_standardized_address,
hclinic_business_standardized,
hclinic_mail_address_cleaned,
hclinic_mail_standardized_address,
hclinic_mail_standardized,
p_loc_address_cleaned,
p_loc_standardized_address,
p_loc_standardized,
p_mail_address_cleaned,
p_mail_standardized_address,
p_mail_standardized,
):
dct_hclinic_address = dict(
usaddress.parse(
hclinic_standardized_address
if hclinic_standardized
else hclinic_address_cleaned
)
)
dct_hclinic_business_address = dict(
usaddress.parse(
hclinic_business_standardized_address
if hclinic_business_standardized
else hclinic_business_address_cleaned
)
)
dct_hclinic_mail_address = dict(
usaddress.parse(
hclinic_mail_standardized_address
if hclinic_mail_standardized
else hclinic_mail_address_cleaned
)
)
dct_p_loc_address = dict(
usaddress.parse(
p_loc_standardized_address
if p_loc_standardized
else p_loc_address_cleaned
)
)
dct_p_mail_address = dict(
usaddress.parse(
p_mail_standardized_address
if p_mail_standardized
else p_mail_address_cleaned
)
)
def cleanup_address_components(dct_address):
dct_address_cleaned = {}
for k, v in dct_address.items():
if (
v == "OccupancyIdentifier" and k.lower().strip() == "and"
) or (
v in ["NotAddress", "PlaceName", "Recipient", "StateName"]
):
pass
dct_address_cleaned[v] = (
k
if v not in dct_address_cleaned
else (
"-"
if v.endswith(
(
"Identifier",
"Number",
)
)
else " "
).join([dct_address_cleaned[v], k])
)
return dct_address_cleaned
dct_hclinic_address = cleanup_address_components(dct_hclinic_address)
dct_p_loc_address = cleanup_address_components(dct_p_loc_address)
dct_p_mail_address = cleanup_address_components(dct_p_mail_address)
hclinic_post_box_id = dct_hclinic_address.get("USPSBoxID", np.nan)
hclinic_post_box_group_id = dct_hclinic_address.get(
"USPSBoxGroupID", np.nan
)
hclinic_address_no = dct_hclinic_address.get("AddressNumber", np.nan)
hclinic_occupancy_no = dct_hclinic_address.get(
"OccupancyIdentifier", np.nan
)
hclinic_business_post_box_id = dct_hclinic_business_address.get(
"USPSBoxID", np.nan
)
hclinic_business_post_box_group_id = dct_hclinic_business_address.get(
"USPSBoxGroupID", np.nan
)
hclinic_business_address_no = dct_hclinic_business_address.get(
"AddressNumber", np.nan
)
hclinic_business_occupancy_no = dct_hclinic_business_address.get(
"OccupancyIdentifier", np.nan
)
hclinic_mail_post_box_id = dct_hclinic_mail_address.get(
"USPSBoxID", np.nan
)
hclinic_mail_post_box_group_id = dct_hclinic_mail_address.get(
"USPSBoxGroupID", np.nan
)
hclinic_mail_address_no = dct_hclinic_mail_address.get(
"AddressNumber", np.nan
)
hclinic_mail_occupancy_no = dct_hclinic_mail_address.get(
"OccupancyIdentifier", np.nan
)
p_loc_post_box_id = dct_p_loc_address.get("USPSBoxID", np.nan)
p_loc_post_box_group_id = dct_p_loc_address.get(
"USPSBoxGroupID", np.nan
)
p_loc_address_no = dct_p_loc_address.get("AddressNumber", np.nan)
p_loc_occupancy_no = dct_p_loc_address.get(
"OccupancyIdentifier", np.nan
)
p_mail_post_box_id = dct_p_mail_address.get("USPSBoxID", np.nan)
p_mail_post_box_group_id = dct_p_mail_address.get(
"USPSBoxGroupID", np.nan
)
p_mail_address_no = dct_p_mail_address.get("AddressNumber", np.nan)
p_mail_occupancy_no = dct_p_mail_address.get(
"OccupancyIdentifier", np.nan
)
return [
hclinic_post_box_id,
hclinic_post_box_group_id,
hclinic_address_no,
hclinic_occupancy_no,
hclinic_business_post_box_id,
hclinic_business_post_box_group_id,
hclinic_business_address_no,
hclinic_business_occupancy_no,
hclinic_mail_post_box_id,
hclinic_mail_post_box_group_id,
hclinic_mail_address_no,
hclinic_mail_occupancy_no,
p_loc_post_box_id,
p_loc_post_box_group_id,
p_loc_address_no,
p_loc_occupancy_no,
p_mail_post_box_id,
p_mail_post_box_group_id,
p_mail_address_no,
p_mail_occupancy_no,
]
pdf = pdf.assign(
**{
col: pdf[col].fillna("").str.strip()
for col in "hclinic_address_cleaned,hclinic_standardized_address,"
"hclinic_business_address_cleaned,hclinic_business_standardized_address,"
"hclinic_mail_address_cleaned,hclinic_mail_standardized_address,"
"p_loc_address_cleaned,p_loc_standardized_address,"
"p_mail_address_cleaned,p_mail_standardized_address".split(",")
},
**{
col: pdf[col].fillna(0).astype(int)
for col in "hclinic_standardized,hclinic_business_standardized,hclinic_mail_standardized,"
"p_loc_standardized,p_mail_standardized".split(",")
},
)
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
pdf = (
dd.from_pandas(pdf, npartitions=ceil(pdf.shape[0] / 10000))
.map_partitions(
lambda pdf_partition: pdf_partition.merge(
pdf_partition.apply(
lambda row: pd.Series(
parse_address(
row["hclinic_address_cleaned"],
row["hclinic_standardized_address"],
row["hclinic_standardized"],
row["hclinic_business_address_cleaned"],
row["hclinic_business_standardized_address"],
row["hclinic_business_standardized"],
row["hclinic_mail_address_cleaned"],
row["hclinic_mail_standardized_address"],
row["hclinic_mail_standardized"],
row["p_loc_address_cleaned"],
row["p_loc_standardized_address"],
row["p_loc_standardized"],
row["p_mail_address_cleaned"],
row["p_mail_standardized_address"],
row["p_mail_standardized"],
),
index=[
"hclinic_post_box_id",
"hclinic_post_box_group_id",
"hclinic_address_no",
"hclinic_occupancy_no",
"hclinic_business_post_box_id",
"hclinic_business_post_box_group_id",
"hclinic_business_address_no",
"hclinic_business_occupancy_no",
"hclinic_mail_post_box_id",
"hclinic_mail_post_box_group_id",
"hclinic_mail_address_no",
"hclinic_mail_occupancy_no",
"p_loc_post_box_id",
"p_loc_post_box_group_id",
"p_loc_address_no",
"p_loc_occupancy_no",
"p_mail_post_box_id",
"p_mail_post_box_group_id",
"p_mail_address_no",
"p_mail_occupancy_no",
],
),
axis=1,
result_type="expand",
),
left_index=True,
right_index=True,
)
)
.compute()
)
pdf = pdf.replace(to_replace=[None, "None"], value=np.nan).fillna("")
return pdf
[docs]
def fuzzy_match(
df, df_npi_provider, source="hcris", logger_name="fuzzy_match"
):
logger = logging.getLogger(logger_name)
logger.info(
"Attempting fuzzy text matching. Merging %s dataset with NPPES using state columns",
source,
)
df = df.reset_index(drop=True)
chunk_size = 100
list_df = [
df[i:i + chunk_size] for i in range(0, df.shape[0], chunk_size)
]
fname_fuzzy_source_nppes_merge = os.path.join(
os.path.join(fqhc_lookup_folder, f"nppes_{source}_text_merged.pickle")
)
if os.path.isfile(fname_fuzzy_source_nppes_merge):
os.remove(fname_fuzzy_source_nppes_merge)
for chunk_id, df_source in enumerate(list_df):
logger.info("Processing chunk %s of %s", chunk_id + 1, len(list_df))
df_nppes_source_p_id_state = df_npi_provider.loc[
df_npi_provider["entity"].str.strip() == "2"
].merge(
df_source.loc[
df_source["hclinic_state"].str.strip().str.len() > 0
],
left_on=["provider_id_state"],
right_on=["hclinic_state"],
how="inner",
)
df_nppes_source_p_loc_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_loc_state"]
)
].merge(
df_source.loc[
df_source["hclinic_state"].str.strip().str.len() > 0
],
left_on=["p_loc_state"],
right_on=["hclinic_state"],
how="inner",
)
df_nppes_source_p_mail_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
(
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_mail_state"]
)
& (
(
df_npi_provider["p_loc_state"]
!= df_npi_provider["p_mail_state"]
)
)
)
].merge(
df_source.loc[
df_source["hclinic_state"].str.strip().str.len() > 0
],
left_on=["p_mail_state"],
right_on=["hclinic_state"],
how="inner",
)
df_nppes_source = dd.concat(
[
df_nppes_source_p_id_state,
df_nppes_source_p_loc_state,
df_nppes_source_p_mail_state,
],
ignore_index=True,
)
if source == "uds":
df_nppes_source_business_p_id_state = df_npi_provider.loc[
df_npi_provider["entity"].str.strip() == "2"
].merge(
df_source.loc[
df_source["hclinic_business_state"].str.strip().str.len()
> 0
],
left_on=["provider_id_state"],
right_on=["hclinic_business_state"],
how="inner",
)
df_nppes_source_business_p_loc_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_loc_state"]
)
].merge(
df_source.loc[
df_source["hclinic_business_state"].str.strip().str.len()
> 0
],
left_on=["p_loc_state"],
right_on=["hclinic_business_state"],
how="inner",
)
df_nppes_source_business_p_mail_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
(
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_mail_state"]
)
& (
(
df_npi_provider["p_loc_state"]
!= df_npi_provider["p_mail_state"]
)
)
)
].merge(
df_source.loc[
df_source["hclinic_business_state"].str.strip().str.len()
> 0
],
left_on=["p_mail_state"],
right_on=["hclinic_business_state"],
how="inner",
)
df_nppes_source_mail_p_id_state = df_npi_provider.loc[
df_npi_provider["entity"].str.strip() == "2"
].merge(
df_source.loc[
df_source["hclinic_mail_state"].str.strip().str.len() > 0
],
left_on=["provider_id_state"],
right_on=["hclinic_mail_state"],
how="inner",
)
df_nppes_source_mail_p_loc_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_loc_state"]
)
].merge(
df_source.loc[
df_source["hclinic_mail_state"].str.strip().str.len() > 0
],
left_on=["p_loc_state"],
right_on=["hclinic_mail_state"],
how="inner",
)
df_nppes_source_mail_p_mail_state = df_npi_provider.loc[
(df_npi_provider["entity"].str.strip() == "2")
& (
(
df_npi_provider["provider_id_state"]
!= df_npi_provider["p_mail_state"]
)
& (
(
df_npi_provider["p_loc_state"]
!= df_npi_provider["p_mail_state"]
)
)
)
].merge(
df_source.loc[
df_source["hclinic_mail_state"].str.strip().str.len() > 0
],
left_on=["p_mail_state"],
right_on=["hclinic_mail_state"],
how="inner",
)
df_nppes_source = dd.concat(
[
df_nppes_source,
df_nppes_source_business_p_id_state,
df_nppes_source_business_p_loc_state,
df_nppes_source_business_p_mail_state,
df_nppes_source_mail_p_id_state,
df_nppes_source_mail_p_loc_state,
df_nppes_source_mail_p_mail_state,
],
ignore_index=True,
)
df_nppes_source = filter_partial_matches(df_nppes_source)
if os.path.isfile(fname_fuzzy_source_nppes_merge):
df_nppes_source = (
pd.concat(
[
pd.read_pickle(fname_fuzzy_source_nppes_merge),
df_nppes_source,
],
ignore_index=True,
)
.drop_duplicates()
.reset_index(drop=True)
)
df_nppes_source.to_pickle(fname_fuzzy_source_nppes_merge)
del df_nppes_source
if (chunk_id + 1) == len(list_df):
logger.info(
"HCLINIC providers were matched on the basis of text (name & address) columns with organization "
"NPIs from NPPES dataset, resulting in %s "
"potential matches. This dataset is saved at %s",
pd.read_pickle(fname_fuzzy_source_nppes_merge).shape[0],
fname_fuzzy_source_nppes_merge,
)
df_nppes_source_fuzzy_matched = pd.read_pickle(
fname_fuzzy_source_nppes_merge
)
df_nppes_source_fuzzy_matched = process_address_columns(
df_nppes_source_fuzzy_matched, logger_name, source
)
logger.info(
"Processing address columns for the %s text based matches",
df_nppes_source_fuzzy_matched.shape[0],
)
df_nppes_source_fuzzy_matched = compute_match_purity(
df_nppes_source_fuzzy_matched[
[
col
for col in df_nppes_source_fuzzy_matched.columns
if col
not in [
"name_score_x",
"address_score_x",
"city_score_x",
"name_score_y",
"address_score_y",
"name_score",
"address_score",
"city_score",
"standardized_address_score",
"city_score_y",
]
]
]
)
fname_fuzzy_source_nppes_merge_with_match_purity = os.path.join(
fqhc_lookup_folder,
f"nppes_{source}_text_merged_with_match_purity.pickle",
)
logger.info(
"Computing match purity for the %s text-based matches. This is "
"saved at %s",
df_nppes_source_fuzzy_matched.shape[0],
fname_fuzzy_source_nppes_merge_with_match_purity,
)
df_nppes_source_fuzzy_matched.to_pickle(
fname_fuzzy_source_nppes_merge_with_match_purity
)
df_nppes_source_address_matched = (
df_nppes_source_fuzzy_matched.loc[
(
df_nppes_source_fuzzy_matched[
[
"standardized_street_address_score",
"address_score",
]
]
>= 70
).any(axis=1)
& (df_nppes_source_fuzzy_matched["door_no_match"] == 1)
& (df_nppes_source_fuzzy_matched["name_score"] >= 50)
& (df_nppes_source_fuzzy_matched["city_score"] >= 80)
]
)
df_nppes_source_name_matched = df_nppes_source_fuzzy_matched.loc[
(df_nppes_source_fuzzy_matched["city_score"] >= 80)
& (df_nppes_source_fuzzy_matched["name_score"] >= 80)
]
logger.info(
"Perfect address matches were arrived at by filtering matches with >= 70 as the string distance score "
"for street addresses, 100%% matching door & street numbers, >=50 as the string distance score for "
"names and >= 80 as the string distance score for cities. "
"%s were matched this way.",
df_nppes_source_address_matched.hclinic_provider_id.nunique(),
)
logger.info(
"Perfect name matches were arrived at by filtering matches with >= 80 as the string distance score "
"for names and >= 80 as the string distance score for cities."
"%s were matched this way.",
df_nppes_source_name_matched.hclinic_provider_id.nunique(),
)
df_text_based_perfect_matches = (
pd.concat(
[
df_nppes_source_address_matched,
df_nppes_source_name_matched,
],
ignore_index=True,
)
.drop_duplicates()
.reset_index(drop=True)
)
df_nppes_source_fuzzy_matched = df_nppes_source_fuzzy_matched.loc[
(
df_nppes_source_fuzzy_matched[
["standardized_street_address_score", "address_score"]
]
>= 70
).any(axis=1)
& (
(df_nppes_source_fuzzy_matched["door_no_match"] == 1)
| (df_nppes_source_fuzzy_matched["name_score"] >= 50)
)
& (df_nppes_source_fuzzy_matched["city_score"] >= 80)
]
return df_text_based_perfect_matches, df_nppes_source_fuzzy_matched
return pd.DataFrame(), pd.DataFrame()
[docs]
def create_npi_fqhc_crosswalk( # pylint: disable=too-many-locals
source: str = "hcris",
logger_name: str = "fqhc_crosswalk",
):
"""Merge HCRIS, UDS and NPPES and datasets to create FQHC NPI list"""
logger = logging.getLogger(logger_name)
# Load all available years of hcris datasets
if not os.path.exists(
os.path.join(hcris_lookup_folder, "hcris_all_years.pickle")
):
logger.info(
"Cleaning and combining all available years of HCRIS datasets"
)
combine_and_clean_hcris_files(logger_name)
df_source = pd.read_pickle(
os.path.join(hcris_lookup_folder, "hcris_all_years.pickle")
)
lst_source_col = df_source.columns
lst_disparate_col = [
"hclinic_po_box",
"hclinic_site_name",
"hclinic_street_address_line_2",
"hclinic_business_address",
"hclinic_business_state",
"hclinic_business_city",
"hclinic_business_zipcode",
"hclinic_mail_address",
"hclinic_mail_state",
"hclinic_mail_city",
"hclinic_mail_zipcode",
]
df_source = df_source.assign(
**{
col: ""
for col in lst_disparate_col
if col not in lst_source_col
}
)
lst_year = list(range(2009, datetime.now().year + 1))
# Load NPPES provider data for all available years
df_npi_provider = dd.concat(
[
dd.read_parquet(
os.path.join(
nppes_lookup_folder,
str(yr),
"npi_provider_parquet_cleaned",
),
engine=pq_engine,
index=False,
)
for yr in lst_year
]
)
# Create a version of NPPES datasets with cleaned providers id columns without leading zeros
df_npi_provider_cleaned = df_npi_provider.assign(
provider_id=df_npi_provider["provider_id"].str.rstrip("0")
)
# Merge source & NPPES data on provider id and state columns
df_source_matched = df_npi_provider.merge(
df_source,
left_on=["provider_id", "provider_id_state"],
right_on=["hclinic_provider_id_cleaned", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using "
"provider_id and provider_id_state columns",
df_source_matched.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched_ploc = df_npi_provider.merge(
df_source,
left_on=["provider_id", "p_loc_state"],
right_on=["hclinic_provider_id_cleaned", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using "
"provider_id and p_loc_state columns",
df_source_matched_ploc.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched_mloc = df_npi_provider.merge(
df_source,
left_on=["provider_id", "p_mail_state"],
right_on=["hclinic_provider_id_cleaned", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using "
"provider_id and m_loc_state columns",
df_source_matched_mloc.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched = (
pd.concat(
[
df_source_matched,
df_source_matched_ploc,
df_source_matched_mloc,
],
ignore_index=True,
)
.reset_index()
.sort_values(["hclinic_provider_id_cleaned", "npi"])
.drop_duplicates(keep="first")
.reset_index(drop=True)
)
fname_nppes_state_matched = os.path.join(
fqhc_lookup_folder, f"{source}_nppes_based_matches.pickle"
)
logger.info(
"In total, %s HCLINIC providers were matched using "
"provider_id and state columns from NPPES. These matched are saved at %s",
df_source_matched.hclinic_provider_id_cleaned.nunique(),
fname_nppes_state_matched,
)
df_source_matched.to_pickle(fname_nppes_state_matched)
# Using NPPES API
# Using hclinic name, city, state and zipcode in the API call
df_api_based_matched = nppes_api_requests(
df_source,
logger_name,
**{
"organization_name": "hclinic_name",
"city": "hclinic_city",
"state": "hclinic_state",
"postal_code": "hclinic_zipcode",
},
)
logger.info(
"In total, %s HCLINIC providers were matched using "
"NPPES API calls",
df_api_based_matched.hclinic_provider_id_cleaned.nunique(),
)
# Expanding multiple NPPES matches in json data returned by NPPES API
logger.info(
"Expanding multiple NPPES matches in json data returned by NPPES API"
)
df_api_based_matched_flattened = flatten_nppes_query_result(
df_api_based_matched
)
df_api_based_matched_flattened = (
df_api_based_matched_flattened.reset_index(drop=True)
)
# Cleaning address columns
logger.info("Cleaning & standarding address columns")
df_api_based_matched_flattened = process_address_columns(
df_api_based_matched_flattened, logger_name, source
)
# Compute match purity
df_api_based_matched_flattened = compute_match_purity(
df_api_based_matched_flattened
)
df_api_based_matched_flattened = df_api_based_matched_flattened.loc[
(df_api_based_matched_flattened["name_score"] >= 70)
| (
(
df_api_based_matched_flattened[
["address_score", "standardized_street_address_score"]
]
>= 70
).any(axis=1)
& (df_api_based_matched_flattened["door_no_match"] == 1)
)
]
logger.info(
"%s HCLINIC providers were matched using API calls"
"after removing less clean matches (name score >70 or (address score >= 70 and same door number))",
df_api_based_matched.hclinic_provider_id_cleaned.nunique(),
)
fname_api_based_matches = os.path.join(
fqhc_lookup_folder, f"{source}_api_perfect_matches.pickle"
)
logger.info("API based matches are saved at %s", fname_api_based_matches)
df_api_based_matched_flattened.to_pickle(fname_api_based_matches)
df_api_based_matched_flattened = df_api_based_matched_flattened.drop(
["npi_json", "result_count", "results"], axis=1
)
df_api_based_matched_flattened["entity"] = "2"
df_perfect_matches = (
pd.concat(
[df_api_based_matched_flattened, df_source_matched],
ignore_index=True,
)
.sort_values(by=["hclinic_provider_id", "npi"])
.reset_index(drop=True)
)
fname_api_and_nppes_perfect_matches = os.path.join(
fqhc_lookup_folder, f"{source}_api_and_nppes_perfect_matches.pickle"
)
logger.info(
"In total, %s HCLINIC providers were matched "
"using exports and NPPES API calls. These matches are saved at %s",
df_perfect_matches.hclinic_provider_id_cleaned.nunique(),
fname_api_and_nppes_perfect_matches,
)
df_perfect_matches.to_pickle(fname_api_and_nppes_perfect_matches)
df_unmatched = df_source.loc[
~df_source["hclinic_provider_id"].isin(
df_perfect_matches["hclinic_provider_id"]
)
]
# Attempting match with NPPES export with only provider_id
logger.info("Attempting match with NPPES export with only provider_id")
df_state_relaxed = df_npi_provider.merge(
df_unmatched,
left_on=["provider_id"],
right_on=["hclinic_provider_id"],
how="inner",
).compute()
logger.info("Cleaning & standarding address in state relaxed matches")
df_state_relaxed = process_address_columns(
df_state_relaxed, logger_name, source
)
# Computing match purity
df_state_relaxed = compute_match_purity(df_state_relaxed)
logger.info("Keeping only the matches with name score > 80")
df_state_relaxed = df_state_relaxed.loc[
(df_state_relaxed["name_score"] >= 80)
]
df_state_relaxed = (
df_state_relaxed.drop_duplicates(
["hclinic_provider_id", "npi"], keep="first"
)
.sort_values(by=["hclinic_provider_id", "npi"])
.reset_index(drop=True)
)
logger.info(
"%s hclinic provider ids were matched in this step",
df_state_relaxed.hclinic_provider_id.nunique(),
)
df_source_matched = (
pd.concat([df_perfect_matches, df_state_relaxed], ignore_index=True)
.sort_values(by=["hclinic_provider_id", "npi"])
.reset_index(drop=True)
)
fname_with_state_relaxed = os.path.join(
fqhc_lookup_folder,
f"{source}_api_and_nppes_perfect_matches_with_state_relaxed.pickle",
)
logger.info(
"In total %s hclinic provider ids were matched as of this step."
" All matches till this step are saved at %s",
df_source_matched.hclinic_provider_id.nunique(),
fname_with_state_relaxed,
)
df_source_matched.to_pickle(fname_with_state_relaxed)
df_unmatched = df_source.loc[
~df_source["hclinic_provider_id"].isin(
df_source_matched["hclinic_provider_id"]
)
]
logger.info(
"%s are yet to be matched",
df_unmatched.hclinic_provider_id.nunique(),
)
df_text_based_perfect_matches, df_nppes_source_fuzzy_matched = fuzzy_match(
df_source, df_npi_provider, source, logger_name
)
fname_source_text_based_perfect_matches = os.path.join(
fqhc_lookup_folder, f"{source}_text_based_perfect_matches.pickle"
)
logger.info(
"In total, perfect matches for %s provider ids were arrived at using text distance metrics."
" These matches are saved at %s",
df_text_based_perfect_matches.hclinic_provider_id.nunique(),
fname_source_text_based_perfect_matches,
)
df_text_based_perfect_matches.to_pickle(
fname_source_text_based_perfect_matches
)
df_source_matched = (
pd.concat(
[df_source_matched, df_text_based_perfect_matches],
ignore_index=True,
)
.sort_values(by=["hclinic_provider_id", "npi"])
.drop_duplicates()
.reset_index(drop=True)
)
fname_source_perfect_matches = os.path.join(
fqhc_lookup_folder, f"h{source}_perfect_matches.pickle"
)
df_source_matched.to_pickle(fname_source_perfect_matches)
logger.info(
"In total, %s of the %s provider ids have perfect matches. All the perfect matches are saved at %s",
df_source_matched.hclinic_provider_id.nunique(),
df_source.hclinic_provider_id.nunique(),
fname_source_perfect_matches,
)
df_nppes_source_fuzzy_matched = df_nppes_source_fuzzy_matched[
~df_nppes_source_fuzzy_matched["npi"].isin(
df_source_matched["npi"].tolist()
)
]
fname_source_fuzzy_matches = os.path.join(
fqhc_lookup_folder, f"{source}_fuzzy_matches.pickle"
)
df_nppes_source_fuzzy_matched.to_pickle(fname_source_fuzzy_matches)
_n_fuzzy = df_nppes_source_fuzzy_matched.hclinic_provider_id.nunique()
_n_no_perfect = (
df_nppes_source_fuzzy_matched[
~df_nppes_source_fuzzy_matched['hclinic_provider_id'].isin(
df_source_matched.hclinic_provider_id
)
].hclinic_provider_id.nunique()
)
logger.info(
"Less perfect matches were arrived at by filtering matches with >= 70 as the string distance score "
"for address and >= 80 as the string distance score for cities. Door number match requirement was "
"relaxed and matches that either meet this criteria or have >= 50 as the string distance score for "
"names were allowed in this set. %s were matched this way. %s of these providers do not have "
"perfect matches. Less perfect matches are saved at %s",
_n_fuzzy, _n_no_perfect, fname_source_fuzzy_matches,
)
# Merge source & NPPES data on provider id (with no leading zeros) and state columns
logger.info(
"Attempting provider id based matches after removing leading zeros, as some sites such as NBER clean "
"them by removing leading zeros"
)
df_source_matched_no_zeros = df_npi_provider_cleaned.merge(
df_source,
left_on=["provider_id", "provider_id_state"],
right_on=["hclinic_provider_id_no_leading_zero", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using provider_id and provider_id_state columns (no leading zeros)",
df_source_matched_no_zeros.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched_ploc_no_zeros = df_npi_provider_cleaned.merge(
df_source,
left_on=["provider_id", "p_loc_state"],
right_on=["hclinic_provider_id_no_leading_zero", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using provider_id and p_loc_state columns (no leading zeros)",
df_source_matched_ploc_no_zeros.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched_mloc_no_zeros = df_npi_provider_cleaned.merge(
df_source,
left_on=["provider_id", "p_mail_state"],
right_on=["hclinic_provider_id_no_leading_zero", "hclinic_state"],
how="inner",
).compute()
logger.info(
"%s HCLINIC providers were matched using provider_id and m_loc_state columns (no leading zeros)",
df_source_matched_mloc_no_zeros.hclinic_provider_id_cleaned.nunique(),
)
df_source_matched_no_zeros = (
pd.concat(
[
df_source_matched_no_zeros,
df_source_matched_ploc_no_zeros,
df_source_matched_mloc_no_zeros,
],
ignore_index=True,
)
.reset_index()
.drop_duplicates(keep="first")
.reset_index(drop=True)
)
fname_nppes_matched_no_zeros = os.path.join(
fqhc_lookup_folder,
f"{source}_nppes_based_matches_no_leading_zeros.pickle",
)
df_source_matched_no_zeros.to_pickle(fname_nppes_matched_no_zeros)
df_source_matched_no_zeros = process_address_columns(
df_source_matched_no_zeros, logger_name
)
df_source_matched_no_zeros = compute_match_purity(
df_source_matched_no_zeros
)
df_source_matched_no_zeros.to_pickle(fname_nppes_matched_no_zeros)
logger.info(
"%s HCLINIC providers were matched using NPPES (no leading zeros). These matches are saved at %s",
df_source_matched_no_zeros.hclinic_provider_id_cleaned.nunique(),
fname_nppes_matched_no_zeros,
)
df_source_matched = (
pd.concat(
[df_source_matched, df_source_matched_no_zeros], ignore_index=True
)
.sort_values(by=["hclinic_provider_id", "npi"])
.drop_duplicates()
.reset_index(drop=True)
)
fname_source_perfect_matches = os.path.join(
fqhc_lookup_folder, f"{source}_perfect_matches.pickle"
)
df_source_matched.to_pickle(fname_source_perfect_matches)
logger.info(
"In total, %s of the %s provider ids have perfect matches. All the perfect matches are saved at %s",
df_source_matched.hclinic_provider_id.nunique(),
df_source.hclinic_provider_id.nunique(),
fname_source_perfect_matches,
)