Skip to content
Snippets Groups Projects
Commit 551cde26 authored by Jens Henrik Goebbert's avatar Jens Henrik Goebbert
Browse files

update dask

parent 47e17263
Branches
No related tags found
No related merge requests found
%% Cell type:markdown id:linear-bangkok tags:
### HIGH THROUGHPUT COMPUTING WITH DASK
**Organisers:** Alan O’Cais, David Swenson
**Website:** https://www.cecam.org/workshop-details/1022
**Synopsis:**
High-throughput (task-based) computing is a flexible approach to parallelisation. It involves splitting a problem into loosely-coupled tasks. A scheduler then orchestrates the parallel execution of those tasks, allowing programs to adaptively scale their resource usage. E-CAM has extended the data-analytics framework Dask with a capable and efficient library to handle such workloads. This workshop will be held as a series of virtual seminars/tutorials on tools in the Dask HPC ecosystem.
**Programme:**
- 21 January 2021, 3pm CET (2pm UTC): Dask - a flexible library for parallel computing in Python
- YouTube link: https://youtu.be/Tl8rO-baKuY
- GitHub Repo: https://github.com/jacobtomlinson/dask-video-tutorial-2020
- 4 February 2021, 3pm CET (2pm UTC): Dask-Jobqueue - a library that integrates Dask with standard HPC queuing systems, such as SLURM or PBS
- YouTube link: https://youtu.be/iNxhHXzmJ1w
- GitHub Repo: https://github.com/ExaESM-WP4/workshop-Dask-Jobqueue-cecam-2021-02
- 11 February 2021, 3pm CET (2pm UTC) : Jobqueue-Features - a library that enables functionality aimed at enhancing scalability
- YouTube link: https://youtu.be/FpMua8iJeTk
- GitHub Repo: https://github.com/E-CAM/jobqueue_features_workshop_materials
%% Cell type:markdown id: tags:
# Dask local cluster example
%% Cell type:markdown id: tags:
## What is Dask? (https://docs.dask.org/en/latest/)
* combine a blocked algorithm approach
* with dynamic and memory aware task scheduling
* to realise a parallel out-of-core NumPy clone
* optimized for interactive computational workloads
-----------------------------------
%% Cell type:markdown id: tags:
### WORKSHOP on DASK - HIGH THROUGHPUT COMPUTING WITH DASK
**Organisers:** Alan O’Cais, David Swenson
**Website:** https://www.cecam.org/workshop-details/1022
**Synopsis:**
High-throughput (task-based) computing is a flexible approach to parallelisation. It involves splitting a problem into loosely-coupled tasks. A scheduler then orchestrates the parallel execution of those tasks, allowing programs to adaptively scale their resource usage. E-CAM has extended the data-analytics framework Dask with a capable and efficient library to handle such workloads. This workshop will be held as a series of virtual seminars/tutorials on tools in the Dask HPC ecosystem.
**Programme:**
- 21 January 2021, 3pm CET (2pm UTC): Dask - a flexible library for parallel computing in Python
- YouTube link: https://youtu.be/Tl8rO-baKuY
- GitHub Repo: https://github.com/jacobtomlinson/dask-video-tutorial-2020
- 4 February 2021, 3pm CET (2pm UTC): Dask-Jobqueue - a library that integrates Dask with standard HPC queuing systems, such as SLURM or PBS
- YouTube link: https://youtu.be/iNxhHXzmJ1w
- GitHub Repo: https://github.com/ExaESM-WP4/workshop-Dask-Jobqueue-cecam-2021-02
- 11 February 2021, 3pm CET (2pm UTC) : Jobqueue-Features - a library that enables functionality aimed at enhancing scalability
- YouTube link: https://youtu.be/FpMua8iJeTk
- GitHub Repo: https://github.com/E-CAM/jobqueue_features_workshop_materials
------------------------------------
%% Cell type:markdown id: tags:
# Example problem: Monte-Carlo estimate of $\pi$
<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" width="25%" align=left alt="PI monte-carlo estimate"/>
## Problem description
Suppose we want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods), i.e. obtain a numerical estimate based on a random sampling approach, and that we want at least single precision floating point accuracy.
We take advantage of the fact that the area of a quarter circle with unit radius is $\pi/4$ and that hence the probability of a randomly chosen point inside a unit square to lie within that circle is $\pi/4$ as well.
So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$ we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.
%% Cell type:markdown id: tags:
## Monte-Carlo estimate with NumPy on a single CPU
* NumPy is the fundamental package for scientific computing with Python (https://numpy.org/).
* It contains a powerful n-dimensional array object and useful random number capabilities.
%% Cell type:code id: tags:
``` python
import numpy
```
%% Cell type:code id: tags:
``` python
def calculate_pi_single(size_in_bytes):
"""Calculate pi using a Monte Carlo method."""
rand_array_shape = (int(size_in_bytes / 8 / 2), 2)
# 2D random array with positions (x, y)
xy = numpy.random.uniform(low=0.0, high=1.0, size=rand_array_shape)
# check if position (x, y) is in unit circle
xy_inside_circle = (xy ** 2).sum(axis=1) < 1
# pi is the fraction of points in circle x 4
pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
print(f" pi estimate: {pi}")
print(f" pi error: {abs(pi - numpy.pi)}\n")
return pi
```
%% Cell type:markdown id: tags:
### Let's calculate...
Observe how the error decreases with an increasing number of randomly chosen positions!
%% Cell type:code id: tags:
``` python
%time pi = calculate_pi_single(size_in_bytes=10_000_000) # 10 MB
%time pi = calculate_pi_single(size_in_bytes=100_000_000) # 100 MB
%time pi = calculate_pi_single(size_in_bytes=1_000_000_000) # 1 GB
```
%% Cell type:markdown id: tags:
### Are we already better than single precision floating point resolution?
%% Cell type:code id: tags:
``` python
numpy.finfo(numpy.float32)
```
%% Cell type:markdown id: tags:
## We won't be able to scale the problem to several Gigabytes or Terabytes!
%% Cell type:markdown id: tags:
### Problems
* slowness of the numpy-only single CPU approach! (we could scale the problem using the [multiprocessing](https://docs.python.org/3.8/library/multiprocessing.html) and/or [threading](https://docs.python.org/3.8/library/threading.html) libraries)
* frontend/login node compute resources are shared and CPU, memory (and IO bandwidth) user demands will collide
%% Cell type:markdown id: tags:
## Monte-Carlo estimate with Dask on multiple CPUs
We define a Dask cluster with 8 CPUs and 24 GB of memory.
%% Cell type:code id: tags:
``` python
import dask.distributed
```
%% Cell type:code id: tags:
``` python
cluster = dask.distributed.LocalCluster(
n_workers=1, threads_per_worker=8, memory_limit=24e9,
ip="0.0.0.0"
)
client = dask.distributed.Client(cluster)
client
```
%% Cell type:markdown id: tags:
### Use dask.array for randomly chosen positions
%% Cell type:code id: tags:
``` python
import numpy, dask.array
```
%% Cell type:code id: tags:
``` python
def calculate_pi_dask(size_in_bytes, number_of_chunks):
"""Calculate pi using a Monte Carlo method."""
array_shape = (int(size_in_bytes / 8 / 2), 2)
chunk_size = (int(array_shape[0] / number_of_chunks), 2)
# 2D random positions array using dask.array
xy = dask.array.random.uniform(
low=0.0, high=1.0, size=array_shape,
# specify chunk size, i.e. task number
chunks=chunk_size )
xy_inside_circle = (xy ** 2).sum(axis=1) < 1
pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
# start Dask calculation
pi = pi.compute()
print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
print(f" pi estimate: {pi}")
print(f" pi error: {abs(pi - numpy.pi)}\n")
display(xy)
return pi
```
%% Cell type:markdown id: tags:
### Let's calculate again...
Observe the wall time decreases of the 1 Gigabyte and 10 Gigabyte random sample $\pi$ estimates!
%% Cell type:code id: tags:
``` python
%time pi = calculate_pi_dask(size_in_bytes=1_000_000_000, number_of_chunks=10) # 1 GB
```
%% Cell type:code id: tags:
``` python
%time pi = calculate_pi_dask(size_in_bytes=10_000_000_000, number_of_chunks=100) # 10 GB
```
%% Cell type:markdown id: tags:
### Let's go larger than memory...
Because Dask splits the computation into single managable tasks, we can scale up easily!
%% Cell type:code id: tags:
``` python
%time pi = calculate_pi_dask(size_in_bytes=100_000_000_000, number_of_chunks=250) # 100 GB
```
%% Cell type:markdown id: tags:
### Are we now better than single precision floating point resolution?
Not at all, if we require an order of magnitude better...
%% Cell type:code id: tags:
``` python
numpy.finfo(numpy.float32)
```
%% Cell type:markdown id: tags:
## We could increase the local cluster CPU resources...
However, the above Dask cluster size is always limited by the memory/CPU resources of a single compute node.
%% Cell type:code id: tags:
``` python
# %time pi = calculate_pi(size_in_bytes=1_000_000_000_000, number_of_chunks=2_500) # 1 TB
```
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment