diff --git a/001-Extensions/Dask_JUWELS.ipynb b/001-Extensions/Dask_JUWELS.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..573ef9a96b223542ee1581dbe6b3bd7371c060fb --- /dev/null +++ b/001-Extensions/Dask_JUWELS.ipynb @@ -0,0 +1,392 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dask Extension" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook will give you a short introduction into the Dask Extension on JURECA. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node. \n", + "First you have to define on which project and partition it should be running." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "queue = \"batch\" # batch, gpus, develgpus, etc.\n", + "project = \"cstvs\" # your project: zam, training19xx, etc." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monte-Carlo Estimate of $\\pi$\n", + "\n", + "We want to estimate the number $\\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\\in[0, 1)$ and $y\\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\\pi \\approx 4 \\cdot N_{circ} / N$.\n", + "\n", + "[<img src=\"https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif\" \n", + " width=\"50%\" \n", + " align=top\n", + " alt=\"PI monte-carlo estimate\">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Core Lessons\n", + "\n", + "- setting up SLURM (and other jobqueue) clusters\n", + "- Scaling clusters\n", + "- Adaptive clusters" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up a Slurm cluster\n", + "\n", + "We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "from dask.distributed import Client\n", + "from dask_jobqueue import SLURMCluster\n", + "import os\n", + "\n", + "cluster = SLURMCluster(\n", + " cores=24,\n", + " processes=2,\n", + " memory=\"100GB\",\n", + " shebang=\"#!/usr/bin/env bash\",\n", + " queue=queue,\n", + " dashboard_address=\":56755\",\n", + " walltime=\"00:30:00\",\n", + " local_directory=\"/tmp\",\n", + " death_timeout=\"15s\",\n", + " interface=\"ib1\",\n", + " log_directory=f'{os.environ[\"HOME\"]}/dask_jobqueue_logs/',\n", + " project=project,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(cluster.job_script())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = Client(cluster)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## You can visit the Dask Dashboard at the following url: \n", + "```\n", + "https://jupyter-jsc.fz-juelich.de/user/<user_name>/<lab_name>/proxy/<port>/status\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scale the cluster to two nodes\n", + "\n", + "A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs).\n", + "\n", + "For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cluster.scale(4) # scale to 4 _workers_" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The Monte Carlo Method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask.array as da\n", + "import numpy as np\n", + "\n", + "\n", + "def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):\n", + " \"\"\"Calculate PI using a Monte Carlo estimate.\"\"\"\n", + "\n", + " size = int(size_in_bytes / 8)\n", + " chunksize = int(chunksize_in_bytes / 8)\n", + "\n", + " xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))\n", + "\n", + " in_circle = (xy ** 2).sum(axis=-1) < 1\n", + " pi = 4 * in_circle.mean()\n", + "\n", + " return pi\n", + "\n", + "\n", + "def print_pi_stats(size, pi, time_delta, num_workers):\n", + " \"\"\"Print pi, calculate offset from true value, and print some stats.\"\"\"\n", + " print(\n", + " f\"{size / 1e9} GB\\n\"\n", + " f\"\\tMC pi: {pi : 13.11f}\"\n", + " f\"\\tErr: {abs(pi - np.pi) : 10.3e}\\n\"\n", + " f\"\\tWorkers: {num_workers}\"\n", + " f\"\\t\\tTime: {time_delta : 7.3f}s\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The actual calculations\n", + "\n", + "We loop over different volumes of double-precision random numbers and estimate $\\pi$ as described above." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from time import time, sleep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for size in (1e9 * n for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Is it running?" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To check if something has been started for you just use the following command in a terminal: \n", + "```\n", + "squeue | grep ${USER}\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scaling the Cluster to twice its size\n", + "\n", + "We increase the number of workers by 2 and the re-run the experiments." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "new_num_workers = 2 * len(cluster.scheduler.workers)\n", + "\n", + "print(f\"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.\")\n", + "\n", + "cluster.scale(new_num_workers)\n", + "\n", + "sleep(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Re-run same experiments with doubled cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for size in (1e9 * n for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Automatically Scaling the Cluster\n", + "\n", + "We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.\n", + "\n", + "_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ca = cluster.adapt(minimum=4, maximum=100)\n", + "\n", + "sleep(4) # Allow for scale-down" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Repeat the calculation from above with larger work loads\n", + "\n", + "(And watch the dash board!)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for size in (n * 1e9 for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )\n", + "\n", + " sleep(20) # allow for scale-down time" + ] + } + ], + "metadata": { + "anaconda-cloud": {}, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}