Source code for medicaid_utils.common_utils.dataframe_utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import logging
import os
from typing import List, Optional

import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa

# sys.path.append("../../medicaid_utils")
from medicaid_utils.common_utils import recipes


[docs] def toggle_datetime_string( df: dd.DataFrame, lst_datetime_col: List[str], to_string: bool = True ) -> dd.DataFrame: """ Toggles date columns in the passed dataframe to string/ datetime types. Parameters ---------- df : dd.DataFrame Dask dataframe lst_datetime_col : list of str List of datetime column names to toggle to_string : bool, default=True True to convert to string, False to convert to datetime Returns ------- dd.DataFrame """ for col in df.columns: if col in lst_datetime_col: if to_string: df = df.map_partitions( lambda pdf, _col=col: pdf.assign( **{ _col: pdf[_col] .dt.strftime("%Y%m%d") .replace("NaT", "") .fillna("") } ) ) else: df = df.map_partitions( lambda pdf, _col=col: pdf.assign( **{ _col: pd.to_datetime( pdf[_col], format="%Y%m%d", errors="coerce" ) } ) ) return df
[docs] def convert_ddcols_to_datetime(df: dd.DataFrame, lst_col: List[str]) -> dd.DataFrame: """Convert list of columns specified in a dataframe to datetime type :param pandas_df df: dataframe :param list(str) lst_col: list of column names :rtype: None """ df = df.map_partitions( lambda pdf: pdf.assign( **{ col: pd.to_datetime(pdf[col], format="%Y%m%d", errors="coerce") for col in lst_col } ) ) return df
[docs] def copy_ddcols( df: dd.DataFrame, lst_col: List[str], lst_new_names: List[str] ) -> dd.DataFrame: df = df.map_partitions( lambda pdf: pdf.assign( **{ lst_new_names[i]: pdf[lst_col[i]] for i in range(len(lst_col)) } ) ) return df
[docs] def get_reduced_column_names( multiidx_df_columns: pd.MultiIndex, combine_levels: bool = False ) -> List[str]: return [ (i[1] or i[0]) if not combine_levels else (i[1] + "_" + i[0]) for i in multiidx_df_columns ]
[docs] def sas_to_pandas(filename: str) -> pd.DataFrame: df = pd.read_sas(filename) lst_float_cols = df.select_dtypes(include=[np.float64]).columns.tolist() for col in df.columns: if col not in lst_float_cols: df[col] = df[col].str.decode("utf-8") return df
def _value_counts_chunk(s: pd.Series) -> pd.Series: return s.value_counts() def _value_counts_agg(s: pd.Series) -> pd.Series: s = s._selected_obj return s.groupby(level=list(range(s.index.nlevels))).sum() def _value_counts_finalize(s: pd.Series) -> pd.Series: return s dask_groupby_value_counts = dd.Aggregation( "value_counts", _value_counts_chunk, _value_counts_agg, _value_counts_finalize, )
[docs] def safe_convert_int_to_str(df: dd.DataFrame, lst_col: List[str]) -> dd.DataFrame: df = df.map_partitions( lambda pdf: pdf.assign( **{ col: pdf[col] .where(~pdf[col].isna(), "") .astype(str) .replace(["nan", "None"], "") .apply( lambda x: str(int(float(x))) if ( recipes.is_number(x) & (("." in x) | ("e" in x) | ("E" in x)) & (x != "") ) else x ) for col in lst_col } ) ) return df
[docs] def fix_index( df: dd.DataFrame, index_name: str, drop_column: bool = True ) -> dd.DataFrame: if (df.index.name != index_name) | (df.divisions[0] is None): if index_name not in df.columns: if df.index.name != index_name: raise ValueError(f"{index_name} not in dataframe") df[index_name] = df.index df = df.set_index(index_name, drop=drop_column) elif not drop_column: df[index_name] = df.index else: df = df[[col for col in df.columns if col != index_name]] return df
[docs] def prepare_dtypes_for_csv( df_temp: dd.DataFrame, df_schema: pd.DataFrame ) -> dd.DataFrame: df_temp = df_temp.map_partitions( lambda pdf: pdf.assign( **{ col: pdf[col].where(~pdf[col].isna(), "").astype(str) for col in df_schema.loc[ df_schema["python_type"].isin(["int", "str"]) ]["name"].tolist() } ) ) df_temp = df_temp.map_partitions( lambda pdf: pdf.assign( **{ col: pdf[col].replace(["nan", "None"], "") for col in df_schema.loc[ df_schema["python_type"].isin(["int", "str"]) ]["name"].tolist() } ) ) df_temp = df_temp.map_partitions( lambda pdf: pdf.assign( **{ col: pdf[col].apply( lambda x: str(int(float(x))) if (recipes.is_number(x)) else x ) for col in df_schema.loc[ df_schema["python_type"].isin(["int", "str"]) ]["name"].tolist() } ) ) df_temp = df_temp.map_partitions( lambda pdf: pdf.assign( **{ col: pd.to_numeric(pdf[col], errors="coerce") for col in df_schema.loc[ df_schema["python_type"].isin(["float"]) ]["name"].tolist() } ) ) return df_temp
[docs] def export( df: dd.DataFrame, pq_engine: str, output_filename: str, pq_location: str, _csv_location: str, lst_datetime_col: List[str], is_dask: bool = True, n_rows: int = -1, do_csv: bool = True, df_schema: Optional[pd.DataFrame] = None, logger_name: str = "Dataframe utils", rewrite: bool = False, do_parquet: bool = True, ) -> None: """Exports a Dask DataFrame to parquet and/or CSV.""" if df_schema is None: df_schema = pd.DataFrame() # df = df.persist() logger = logging.getLogger(logger_name) if n_rows < 0: n_rows = len(df) if df.head().shape[0] < 1: df = df.repartition( npartitions=min( max(int(n_rows / 100000), 1), max(df.npartitions - 2, 1) ) ).persist() else: df = df.repartition( partition_size="100MB" ).persist() # df.repartition(npartitions=max(int(n_rows / 100000), df.npartitions)).persist() df = toggle_datetime_string(df, lst_datetime_col, to_string=True) if is_dask: if do_parquet: os.umask(int("007", base=8)) os.makedirs(pq_location, exist_ok=True, mode=int("2770", base=8)) os.chmod(pq_location, mode=int("2770", base=8)) if not rewrite: recipes.remove_ignore_if_not_exists(pq_location) if pq_engine == "pyarrow": logger.info("NROWS: %s(%s)", n_rows, pq_location) df_sample = df.sample( frac=min(1, 250000 / max(n_rows, 250000)), replace=False, random_state=100, ).compute() lst_null_cols = ( df_sample.columns[df_sample.isnull().all(axis=0)] if (n_rows > 0) else [] ) df_sample = df_sample.assign( **dict( [ (col, df_sample[col].astype("float")) for col in lst_null_cols if (col not in lst_datetime_col) ] + [ (col, df_sample[col].fillna("").astype(str)) for col in lst_null_cols if (col in lst_datetime_col) ] ) ) # df_sample.to_csv(os.path.join(csv_location, 'sample.csv'), index=False) schema_sample = pa.Schema.from_pandas( df_sample.reset_index(drop=True) ) df.to_parquet( pq_location + ("_temp" if (rewrite) else ""), # compression='xz', engine=pq_engine, write_index=False, schema=schema_sample, ) if rewrite: recipes.remove_ignore_if_not_exists(pq_location) os.rename(pq_location + "_temp", pq_location) else: df.to_parquet( pq_location, # compression='xz', engine=pq_engine, write_index=False, ) if do_csv: df_formatted = prepare_dtypes_for_csv(df, df_schema)[ df_schema.sort_values(by="sr_no")["name"].tolist() ] recipes.remove_ignore_if_not_exists(output_filename) # df_formatted.to_csv(output_filename, single_file=True, quoting=csv.QUOTE_NONNUMERIC, quotechar='"', # float_format='%f', index=False) df_formatted.to_csv( output_filename.replace(".csv", ".sas.csv"), single_file=True, # header=False, quoting=csv.QUOTE_NONNUMERIC, quotechar='"', float_format="%f", index=False, ) else: df.to_parquet( os.path.join( pq_location, os.path.basename(os.path.normpath(output_filename)) + ".parquet.gzip", ), compression="gzip", index=False, ) if do_csv: df.to_csv(output_filename, header=True, index=False) df = toggle_datetime_string(df, lst_datetime_col, to_string=False)
[docs] def get_first_day_gap( df: pd.DataFrame, index_col: str, time_col: str, start_date_col: str, threshold: int, ) -> pd.DataFrame: df = df.sort_values(by=[index_col, time_col]) df = df.assign( gap_dur=df.groupby(index_col)[time_col].diff().dt.days.fillna(0), prev_care_date=df.groupby(index_col)[time_col].shift(), ) df = df.assign(gap_gt_threshold=(df["gap_dur"] >= threshold).astype(int)) df = df.loc[df["prev_care_date"] >= df[start_date_col]] df = df.loc[df["gap_gt_threshold"] == 1] df = df.assign( gap_start_date=df.groupby(index_col)["prev_care_date"].transform("min") ) df = ( df[[index_col, "gap_start_date"]] .drop_duplicates() .reset_index(drop=True) ) return df