Scaling with Dask¶
All DataFrames in medicaid-utils are Dask DataFrames,
enabling lazy evaluation and distributed processing. The distributed package is
a required dependency and is installed automatically with medicaid-utils.
Local Cluster¶
For workstations and laptops:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=8,
threads_per_worker=1, # avoids GIL contention with pandas
memory_limit="8GB", # per worker
)
client = Client(cluster)
print(client.dashboard_link) # Dask dashboard for monitoring
Sizing guidelines:
State size |
Examples |
Recommended configuration |
|---|---|---|
Small |
WY, VT, ND |
4 workers, 4 GB each (16 GB total) |
Medium |
AL, IL, OH |
8 workers, 8 GB each (64 GB total) |
Large |
CA, NY, TX |
8–16 workers, 8+ GB each, or HPC cluster |
SLURM / HPC Clusters¶
For high-performance computing environments, use dask-jobqueue:
pip install dask-jobqueue
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(
cores=4,
memory="32GB",
processes=1,
walltime="04:00:00",
queue="standard",
)
cluster.scale(jobs=10) # submit 10 SLURM jobs
client = Client(cluster)
For PBS/Torque environments, substitute PBSCluster:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=4,
memory="32GB",
processes=1,
walltime="04:00:00",
queue="batch",
)
Without a Cluster¶
For debugging or small datasets, you can bypass the distributed scheduler:
import dask
# Single-threaded (easiest to debug)
dask.config.set(scheduler="synchronous")
# Threaded (no cluster overhead)
dask.config.set(scheduler="threads")
Performance Tips¶
Using tmp_folder¶
The tmp_folder parameter caches intermediate results to disk, reducing memory pressure:
from medicaid_utils.preprocessing import taf_ip
ip = taf_ip.TAFIP(
year=2019, state="CA", data_root="/data/cms",
tmp_folder="/scratch/tmp/", # intermediate files written here
)
This is especially important for large states where the full dataset does not fit in memory.
Partition sizing¶
Aim for partitions of 50–200 MB. Too many small partitions create scheduling overhead; too few large partitions cause memory spills. You can check and adjust:
print(ip.dct_files["base"].npartitions)
# Repartition if needed
ip.dct_files["base"] = ip.dct_files["base"].repartition(npartitions=20)
Multi-state analysis¶
Process states sequentially and release memory between iterations:
import gc
import pandas as pd
from medicaid_utils.preprocessing import taf_ip
results = []
for state in ["CA", "NY", "TX", "FL", "IL"]:
ip = taf_ip.TAFIP(year=2019, state=state, data_root="/data/cms")
n_claims = len(ip.dct_files["base"])
results.append({"state": state, "n_claims": n_claims})
del ip
gc.collect()
df_summary = pd.DataFrame(results)
Dask dashboard¶
The Dask dashboard (typically at http://localhost:8787) provides real-time monitoring
of task progress, memory usage, and worker status. Open the URL printed by
client.dashboard_link in your browser.
See also
Installation for dependency details,
Preprocessing Claims for constructor parameters including tmp_folder.