#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import logging
import os
from typing import List, Optional
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa
# sys.path.append("../../medicaid_utils")
from medicaid_utils.common_utils import recipes
[docs]
def toggle_datetime_string(
df: dd.DataFrame, lst_datetime_col: List[str], to_string: bool = True
) -> dd.DataFrame:
"""
Toggles date columns in the passed dataframe to string/ datetime types.
Parameters
----------
df : dd.DataFrame
Dask dataframe
lst_datetime_col : list of str
List of datetime column names to toggle
to_string : bool, default=True
True to convert to string, False to convert to datetime
Returns
-------
dd.DataFrame
"""
for col in df.columns:
if col in lst_datetime_col:
if to_string:
df = df.map_partitions(
lambda pdf, _col=col: pdf.assign(
**{
_col: pdf[_col]
.dt.strftime("%Y%m%d")
.replace("NaT", "")
.fillna("")
}
)
)
else:
df = df.map_partitions(
lambda pdf, _col=col: pdf.assign(
**{
_col: pd.to_datetime(
pdf[_col], format="%Y%m%d", errors="coerce"
)
}
)
)
return df
[docs]
def convert_ddcols_to_datetime(df: dd.DataFrame, lst_col: List[str]) -> dd.DataFrame:
"""Convert list of columns specified in a dataframe to datetime type
:param pandas_df df: dataframe
:param list(str) lst_col: list of column names
:rtype: None
"""
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
col: pd.to_datetime(pdf[col], format="%Y%m%d", errors="coerce")
for col in lst_col
}
)
)
return df
[docs]
def copy_ddcols(
df: dd.DataFrame, lst_col: List[str], lst_new_names: List[str]
) -> dd.DataFrame:
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
lst_new_names[i]: pdf[lst_col[i]]
for i in range(len(lst_col))
}
)
)
return df
[docs]
def get_reduced_column_names(
multiidx_df_columns: pd.MultiIndex, combine_levels: bool = False
) -> List[str]:
return [
(i[1] or i[0]) if not combine_levels else (i[1] + "_" + i[0])
for i in multiidx_df_columns
]
[docs]
def sas_to_pandas(filename: str) -> pd.DataFrame:
df = pd.read_sas(filename)
lst_float_cols = df.select_dtypes(include=[np.float64]).columns.tolist()
for col in df.columns:
if col not in lst_float_cols:
df[col] = df[col].str.decode("utf-8")
return df
def _value_counts_chunk(s: pd.Series) -> pd.Series:
return s.value_counts()
def _value_counts_agg(s: pd.Series) -> pd.Series:
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def _value_counts_finalize(s: pd.Series) -> pd.Series:
return s
dask_groupby_value_counts = dd.Aggregation(
"value_counts",
_value_counts_chunk,
_value_counts_agg,
_value_counts_finalize,
)
[docs]
def safe_convert_int_to_str(df: dd.DataFrame, lst_col: List[str]) -> dd.DataFrame:
df = df.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col]
.where(~pdf[col].isna(), "")
.astype(str)
.replace(["nan", "None"], "")
.apply(
lambda x: str(int(float(x)))
if (
recipes.is_number(x)
& (("." in x) | ("e" in x) | ("E" in x))
& (x != "")
)
else x
)
for col in lst_col
}
)
)
return df
[docs]
def fix_index(
df: dd.DataFrame, index_name: str, drop_column: bool = True
) -> dd.DataFrame:
if (df.index.name != index_name) | (df.divisions[0] is None):
if index_name not in df.columns:
if df.index.name != index_name:
raise ValueError(f"{index_name} not in dataframe")
df[index_name] = df.index
df = df.set_index(index_name, drop=drop_column)
elif not drop_column:
df[index_name] = df.index
else:
df = df[[col for col in df.columns if col != index_name]]
return df
[docs]
def prepare_dtypes_for_csv(
df_temp: dd.DataFrame, df_schema: pd.DataFrame
) -> dd.DataFrame:
df_temp = df_temp.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col].where(~pdf[col].isna(), "").astype(str)
for col in df_schema.loc[
df_schema["python_type"].isin(["int", "str"])
]["name"].tolist()
}
)
)
df_temp = df_temp.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col].replace(["nan", "None"], "")
for col in df_schema.loc[
df_schema["python_type"].isin(["int", "str"])
]["name"].tolist()
}
)
)
df_temp = df_temp.map_partitions(
lambda pdf: pdf.assign(
**{
col: pdf[col].apply(
lambda x: str(int(float(x)))
if (recipes.is_number(x))
else x
)
for col in df_schema.loc[
df_schema["python_type"].isin(["int", "str"])
]["name"].tolist()
}
)
)
df_temp = df_temp.map_partitions(
lambda pdf: pdf.assign(
**{
col: pd.to_numeric(pdf[col], errors="coerce")
for col in df_schema.loc[
df_schema["python_type"].isin(["float"])
]["name"].tolist()
}
)
)
return df_temp
[docs]
def export(
df: dd.DataFrame,
pq_engine: str,
output_filename: str,
pq_location: str,
_csv_location: str,
lst_datetime_col: List[str],
is_dask: bool = True,
n_rows: int = -1,
do_csv: bool = True,
df_schema: Optional[pd.DataFrame] = None,
logger_name: str = "Dataframe utils",
rewrite: bool = False,
do_parquet: bool = True,
) -> None:
"""Exports a Dask DataFrame to parquet and/or CSV."""
if df_schema is None:
df_schema = pd.DataFrame()
# df = df.persist()
logger = logging.getLogger(logger_name)
if n_rows < 0:
n_rows = len(df)
if df.head().shape[0] < 1:
df = df.repartition(
npartitions=min(
max(int(n_rows / 100000), 1), max(df.npartitions - 2, 1)
)
).persist()
else:
df = df.repartition(
partition_size="100MB"
).persist() # df.repartition(npartitions=max(int(n_rows / 100000), df.npartitions)).persist()
df = toggle_datetime_string(df, lst_datetime_col, to_string=True)
if is_dask:
if do_parquet:
os.umask(int("007", base=8))
os.makedirs(pq_location, exist_ok=True, mode=int("2770", base=8))
os.chmod(pq_location, mode=int("2770", base=8))
if not rewrite:
recipes.remove_ignore_if_not_exists(pq_location)
if pq_engine == "pyarrow":
logger.info("NROWS: %s(%s)", n_rows, pq_location)
df_sample = df.sample(
frac=min(1, 250000 / max(n_rows, 250000)),
replace=False,
random_state=100,
).compute()
lst_null_cols = (
df_sample.columns[df_sample.isnull().all(axis=0)]
if (n_rows > 0)
else []
)
df_sample = df_sample.assign(
**dict(
[
(col, df_sample[col].astype("float"))
for col in lst_null_cols
if (col not in lst_datetime_col)
]
+ [
(col, df_sample[col].fillna("").astype(str))
for col in lst_null_cols
if (col in lst_datetime_col)
]
)
)
# df_sample.to_csv(os.path.join(csv_location, 'sample.csv'), index=False)
schema_sample = pa.Schema.from_pandas(
df_sample.reset_index(drop=True)
)
df.to_parquet(
pq_location + ("_temp" if (rewrite) else ""),
# compression='xz',
engine=pq_engine,
write_index=False,
schema=schema_sample,
)
if rewrite:
recipes.remove_ignore_if_not_exists(pq_location)
os.rename(pq_location + "_temp", pq_location)
else:
df.to_parquet(
pq_location,
# compression='xz',
engine=pq_engine,
write_index=False,
)
if do_csv:
df_formatted = prepare_dtypes_for_csv(df, df_schema)[
df_schema.sort_values(by="sr_no")["name"].tolist()
]
recipes.remove_ignore_if_not_exists(output_filename)
# df_formatted.to_csv(output_filename, single_file=True, quoting=csv.QUOTE_NONNUMERIC, quotechar='"',
# float_format='%f', index=False)
df_formatted.to_csv(
output_filename.replace(".csv", ".sas.csv"),
single_file=True, # header=False,
quoting=csv.QUOTE_NONNUMERIC,
quotechar='"',
float_format="%f",
index=False,
)
else:
df.to_parquet(
os.path.join(
pq_location,
os.path.basename(os.path.normpath(output_filename))
+ ".parquet.gzip",
),
compression="gzip",
index=False,
)
if do_csv:
df.to_csv(output_filename, header=True, index=False)
df = toggle_datetime_string(df, lst_datetime_col, to_string=False)
[docs]
def get_first_day_gap(
df: pd.DataFrame,
index_col: str,
time_col: str,
start_date_col: str,
threshold: int,
) -> pd.DataFrame:
df = df.sort_values(by=[index_col, time_col])
df = df.assign(
gap_dur=df.groupby(index_col)[time_col].diff().dt.days.fillna(0),
prev_care_date=df.groupby(index_col)[time_col].shift(),
)
df = df.assign(gap_gt_threshold=(df["gap_dur"] >= threshold).astype(int))
df = df.loc[df["prev_care_date"] >= df[start_date_col]]
df = df.loc[df["gap_gt_threshold"] == 1]
df = df.assign(
gap_start_date=df.groupby(index_col)["prev_care_date"].transform("min")
)
df = (
df[[index_col, "gap_start_date"]]
.drop_duplicates()
.reset_index(drop=True)
)
return df