Parallel python with dask and jupyter
The dask framework provides an incredibly useful environment for parallel execution of python code in interactive settings (e.g. jupyter) or batch mode. Its key features are (from what I’ve seen so far):
- Representation of threading, multiprocessing, and distributed computing with one unified API and CLI.
- Abstraction of HPC schedulers (PBS, Moab, SLURM, …)
- Data structures for distributed computing with pandas and numpy syntax
Dask-jobqueue
The package dask_jobqueue
seems to me to be the most userfriendly if it comes
to parallelization on HPC clusters with a scheduling system such as SLURM.
For now, the most interesting for me is to determine how dask
maps the
parameters given to the cluster
API and the cluster.scale
method to the
parameters usually given in a SLURM batch job script and the mpirun
parameters.
But first things first. A typical notebook using dask
starts of with the
configuration of the cluster
:
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
cores=12, # Number of cores per job =MPI= cpus_per_task
memory="16GB", # Total Memory per Job (--mem)
walltime="00:10:00", # Expected time to complete a job.
)
The basic unit of reference, to against which all dask
parameters seem to be
defined is the job
. Quoting from the
dask-jobqueue
documentation,
“A job
is a [set of] resource[s] submitted to, and managed by, the
job queueing system. One job
may include one or more workers
.” And: “A worker
is
a python object that represents a node in a dask cluster
” The aim of this post
is to provide more substantial explanations of these terms and understand how
they relate precisely to the above mentioned SLURM and mpi parameters.
In above snippet, jobs are not defined. Typically, one finds below the cluster
initialization, a call to cluster.scale()
:
cluster.scale(jobs=2)
Dask will then request the appropriate resources through SLURM.
In the above example, the resulting job script is
print(cluster.job_script())
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=12
#SBATCH --mem=15G
#SBATCH -t 00:10:00
/home/grotec/miniconda3/bin/python -m distributed.cli.dask_worker \
tcp://172.16.3.151:51525 \
--nthreads 3 \
--nprocs 4 \
--memory-limit 4.00GB \
--name dummy-name \
--nanny \
--death-timeout 60 \
--protocol tcp://
Let’s try to break this down and understand how and why the SLURM and MPI parameters have been chosen this way:
#SBATCH -n 1
The -n
option in SLURM specifies the number of tasks (long form is
--ntasks
). Since each dask job
is represented by one SLURM job script, I conclude
with a certain reservation that one dask job
is also represented by one SLURM
task and that -n 1
fixates the equivalence between task and job. Dask will
submit jobs
jobs to the scheduler rather than one big job with ntasks
set to
the number of requested (dask) jobs.
#SBATCH --cpus-per-task=12
For each task (job), SLURM will allocate 12 CPUs. This corresponds to the
cores=12
argument in the initialization of the SLURMCluster
. So one core in
a dask
cluster
is one CPU per task in SLURM.
Let’s now look at the command line arguments to python -m distributed.cli.dask_worker
:
--nthreads 3
--nprocs 4
A physical CPU in modern computer hardware consists of ncore
cores and each
core can hold a number of (hyper)threads (see this FAQ at SCG Stanford Genomics
Cluster). This structure is also represented in
SLURM and Dask. Dask maps one process
(the elementary unit of computation)
to one core
number of processes can be given as an argument to the SLURMCluster
constructor but by default, it is set as processes ~= sqrt(cores)
.
Subsequently, the number of threads is set as nthreads = cores / processes
such that the number of processes and the number of threads per core is
approximately equal.
--nanny
The --nanny
(activated by default) option tells dask to start up an extra
process (the “nanny process”) to monitor the actual worker processes.
Another option is to specify the resources to be requested directly:
cluster.scale(cores=48) # Here `cores` are the total number of cores to be
allocated.
or
cluster.scale(memory="200 GB")