Dask and Jupyter

Parallel python with dask and jupyter

The dask framework provides an incredibly useful environment for parallel execution of python code in interactive settings (e.g. jupyter) or batch mode. Its key features are (from what I’ve seen so far):

  • Representation of threading, multiprocessing, and distributed computing with one unified API and CLI.
  • Abstraction of HPC schedulers (PBS, Moab, SLURM, …)
  • Data structures for distributed computing with pandas and numpy syntax

Dask-jobqueue

The package dask_jobqueue seems to me to be the most userfriendly if it comes to parallelization on HPC clusters with a scheduling system such as SLURM. For now, the most interesting for me is to determine how dask maps the parameters given to the cluster API and the cluster.scale method to the parameters usually given in a SLURM batch job script and the mpirun parameters.

But first things first. A typical notebook using dask starts of with the configuration of the cluster:

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
   cores=12,            # Number of cores per job =MPI= cpus_per_task
   memory="16GB",       # Total Memory per Job (--mem)
   walltime="00:10:00", # Expected time to complete a job.
)

The basic unit of reference, to against which all dask parameters seem to be defined is the job. Quoting from the dask-jobqueue documentation, “A job is a [set of] resource[s] submitted to, and managed by, the job queueing system. One job may include one or more workers.” And: “A worker is a python object that represents a node in a dask cluster” The aim of this post is to provide more substantial explanations of these terms and understand how they relate precisely to the above mentioned SLURM and mpi parameters.

In above snippet, jobs are not defined. Typically, one finds below the cluster initialization, a call to cluster.scale():

cluster.scale(jobs=2)

Dask will then request the appropriate resources through SLURM.

In the above example, the resulting job script is

print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=12
#SBATCH --mem=15G
#SBATCH -t 00:10:00

/home/grotec/miniconda3/bin/python -m distributed.cli.dask_worker \
    tcp://172.16.3.151:51525 \
    --nthreads 3 \
    --nprocs 4 \
    --memory-limit 4.00GB \
    --name dummy-name \
    --nanny \
    --death-timeout 60 \
    --protocol tcp://

Let’s try to break this down and understand how and why the SLURM and MPI parameters have been chosen this way:

#SBATCH -n 1

The -n option in SLURM specifies the number of tasks (long form is --ntasks). Since each dask job is represented by one SLURM job script, I conclude with a certain reservation that one dask job is also represented by one SLURM task and that -n 1 fixates the equivalence between task and job. Dask will submit jobs jobs to the scheduler rather than one big job with ntasks set to the number of requested (dask) jobs.

#SBATCH --cpus-per-task=12

For each task (job), SLURM will allocate 12 CPUs. This corresponds to the cores=12 argument in the initialization of the SLURMCluster. So one core in a dask cluster is one CPU per task in SLURM.

Let’s now look at the command line arguments to python -m distributed.cli.dask_worker:

--nthreads 3
--nprocs 4

A physical CPU in modern computer hardware consists of ncore cores and each core can hold a number of (hyper)threads (see this FAQ at SCG Stanford Genomics Cluster). This structure is also represented in SLURM and Dask. Dask maps one process (the elementary unit of computation) to one core number of processes can be given as an argument to the SLURMCluster constructor but by default, it is set as processes ~= sqrt(cores). Subsequently, the number of threads is set as nthreads = cores / processes such that the number of processes and the number of threads per core is approximately equal.

--nanny

The --nanny (activated by default) option tells dask to start up an extra process (the “nanny process”) to monitor the actual worker processes.

Another option is to specify the resources to be requested directly:

cluster.scale(cores=48)   # Here `cores` are the total number of cores to be
allocated.

or

cluster.scale(memory="200 GB")