Skip to content
Snippets Groups Projects
Commit 63f91678 authored by Tim Kreuzer's avatar Tim Kreuzer
Browse files

update Dask JUWELS notebook

parent c8338805
Branches
No related tags found
No related merge requests found
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Dask Extension # Dask Extension
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## If you have problems with this tutorial, try to download the Notebook.
%% Cell type:code id: tags:
``` python
!wget https://jupyter-jsc.fz-juelich.de/static/files/Dask_JUWELS.ipynb
```
%% Cell type:markdown id: tags:
This notebook will give you a short introduction into the Dask Extension on JURECA. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node. This notebook will give you a short introduction into the Dask Extension on JURECA. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node.
First you have to define on which project and partition it should be running. First you have to define on which project and partition it should be running.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
queue = "batch" # batch, gpus, develgpus, etc. queue = "batch" # batch, gpus, develgpus, etc.
project = "zam" # your project: zam, training19xx, etc. project = "zam" # your project: zam, training19xx, etc.
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Monte-Carlo Estimate of $\pi$ # Monte-Carlo Estimate of $\pi$
We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$. We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.
[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" [<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif"
width="50%" width="50%"
align=top align=top
alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Core Lessons ## Core Lessons
- setting up SLURM (and other jobqueue) clusters - setting up SLURM (and other jobqueue) clusters
- Scaling clusters - Scaling clusters
- Adaptive clusters - Adaptive clusters
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Set up a Slurm cluster ## Set up a Slurm cluster
We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler. We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import dask import dask
from dask.distributed import Client from dask.distributed import Client
from dask_jobqueue import SLURMCluster from dask_jobqueue import SLURMCluster
import os import os
cluster = SLURMCluster( cluster = SLURMCluster(
cores=24, cores=24,
processes=2, processes=2,
memory="100GB", memory="100GB",
shebang="#!/usr/bin/env bash", shebang="#!/usr/bin/env bash",
queue=queue, queue=queue,
scheduler_options={"dashboard_address": ":56755"}, scheduler_options={"dashboard_address": ":56764"},
walltime="00:30:00", walltime="00:30:00",
local_directory="/tmp", local_directory="/tmp",
death_timeout="15s", death_timeout="15s",
interface="ib1", interface="ib0",
log_directory=f'{os.environ["HOME"]}/dask_jobqueue_logs/', log_directory=f'{os.environ["HOME"]}/dask_jobqueue_logs/',
project=project, project=project,
) )
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print(cluster.job_script()) print(cluster.job_script())
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
client = Client(cluster) client = Client(cluster)
client client
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## You can visit the Dask Dashboard at the following url: ## You can visit the Dask Dashboard at the following url:
``` ```
https://jupyter-jsc.fz-juelich.de/user/<user_name>/<lab_name>/proxy/<port>/status https://jupyter-jsc.fz-juelich.de/user/<user_name>/<lab_name>/proxy/<port>/status
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension ## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
!["Dask"](https://zam10183.zam.kfa-juelich.de/hub/static/images/dask2.png "dask") !["Dask"](https://jupyter-jsc.fz-juelich.de/hub/static/images/dask2.png "dask")
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment. Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment.
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Scale the cluster to two nodes ## Scale the cluster to two nodes
A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs). A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs).
For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs). For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs).
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
cluster.scale(4) # scale to 4 _workers_ cluster.scale(4) # scale to 4 _workers_
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## The Monte Carlo Method ## The Monte Carlo Method
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import dask.array as da import dask.array as da
import numpy as np import numpy as np
def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6): def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
"""Calculate PI using a Monte Carlo estimate.""" """Calculate PI using a Monte Carlo estimate."""
size = int(size_in_bytes / 8) size = int(size_in_bytes / 8)
chunksize = int(chunksize_in_bytes / 8) chunksize = int(chunksize_in_bytes / 8)
xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2)) xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))
in_circle = (xy ** 2).sum(axis=-1) < 1 in_circle = (xy ** 2).sum(axis=-1) < 1
pi = 4 * in_circle.mean() pi = 4 * in_circle.mean()
return pi return pi
def print_pi_stats(size, pi, time_delta, num_workers): def print_pi_stats(size, pi, time_delta, num_workers):
"""Print pi, calculate offset from true value, and print some stats.""" """Print pi, calculate offset from true value, and print some stats."""
print( print(
f"{size / 1e9} GB\n" f"{size / 1e9} GB\n"
f"\tMC pi: {pi : 13.11f}" f"\tMC pi: {pi : 13.11f}"
f"\tErr: {abs(pi - np.pi) : 10.3e}\n" f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
f"\tWorkers: {num_workers}" f"\tWorkers: {num_workers}"
f"\t\tTime: {time_delta : 7.3f}s" f"\t\tTime: {time_delta : 7.3f}s"
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## The actual calculations ## The actual calculations
We loop over different volumes of double-precision random numbers and estimate $\pi$ as described above. We loop over different volumes of double-precision random numbers and estimate $\pi$ as described above.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from time import time, sleep from time import time, sleep
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
for size in (1e9 * n for n in (1, 10, 100)): for size in (1e9 * n for n in (1, 10, 100)):
start = time() start = time()
pi = calc_pi_mc(size).compute() pi = calc_pi_mc(size).compute()
elaps = time() - start elaps = time() - start
print_pi_stats( print_pi_stats(
size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers) size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Is it running? ## Is it running?
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
To check if something has been started for you just use the following command in a terminal: To check if something has been started for you just use the following command in a terminal:
``` ```
squeue | grep ${USER} squeue | grep ${USER}
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Scaling the Cluster to twice its size ## Scaling the Cluster to twice its size
We increase the number of workers by 2 and the re-run the experiments. We increase the number of workers by 2 and the re-run the experiments.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
new_num_workers = 2 * len(cluster.scheduler.workers) new_num_workers = 2 * len(cluster.scheduler.workers)
print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.") print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.")
cluster.scale(new_num_workers) cluster.scale(new_num_workers)
sleep(10) sleep(10)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
client client
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Re-run same experiments with doubled cluster ## Re-run same experiments with doubled cluster
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
for size in (1e9 * n for n in (1, 10, 100)): for size in (1e9 * n for n in (1, 10, 100)):
start = time() start = time()
pi = calc_pi_mc(size).compute() pi = calc_pi_mc(size).compute()
elaps = time() - start elaps = time() - start
print_pi_stats( print_pi_stats(
size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers) size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Automatically Scaling the Cluster ## Automatically Scaling the Cluster
We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling. We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.
_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._ _**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
ca = cluster.adapt(minimum=4, maximum=100) ca = cluster.adapt(minimum=4, maximum=100)
sleep(4) # Allow for scale-down sleep(4) # Allow for scale-down
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
client client
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Repeat the calculation from above with larger work loads ## Repeat the calculation from above with larger work loads
(And watch the dash board!) (And watch the dash board!)
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
for size in (n * 1e9 for n in (1, 10, 100)): for size in (n * 1e9 for n in (1, 10, 100)):
start = time() start = time()
pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute() pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()
elaps = time() - start elaps = time() - start
print_pi_stats( print_pi_stats(
size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers) size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
) )
sleep(20) # allow for scale-down time sleep(20) # allow for scale-down time
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment