diff --git a/001-Extensions/Dask_JURECA.ipynb b/001-Extensions/Dask_JURECA.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..dce735b86a28aaa0c2e5000fd2a2560454d6a16e --- /dev/null +++ b/001-Extensions/Dask_JURECA.ipynb @@ -0,0 +1,549 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dask Extension" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook will give you a short introduction into the Dask Extension on JURECA. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node. \n", + "First you have to define on which project and partition it should be running." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "queue = \"batch\" # batch, gpus, develgpus, etc.\n", + "project = \"cstvs\" # your project: zam, training19xx, etc." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monte-Carlo Estimate of $\\pi$\n", + "\n", + "We want to estimate the number $\\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\\in[0, 1)$ and $y\\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\\pi \\approx 4 \\cdot N_{circ} / N$.\n", + "\n", + "[<img src=\"https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif\" \n", + " width=\"50%\" \n", + " align=top\n", + " alt=\"PI monte-carlo estimate\">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Core Lessons\n", + "\n", + "- setting up SLURM (and other jobqueue) clusters\n", + "- Scaling clusters\n", + "- Adaptive clusters" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up a Slurm cluster\n", + "\n", + "We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "from dask.distributed import Client\n", + "from dask_jobqueue import SLURMCluster\n", + "import os\n", + "\n", + "cluster = SLURMCluster(\n", + " cores=24,\n", + " processes=2,\n", + " memory=\"100GB\",\n", + " shebang='#!/usr/bin/env bash',\n", + " queue=queue,\n", + " dashboard_address=\":56755\",\n", + " walltime=\"00:30:00\",\n", + " local_directory='/tmp',\n", + " death_timeout=\"15s\",\n", + " interface=\"ib0\",\n", + " log_directory=f'{os.environ[\"HOME\"]}/dask_jobqueue_logs/',\n", + " project=project)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "#!/usr/bin/env bash\n", + "\n", + "#SBATCH -J dask-worker\n", + "#SBATCH -e /p/home/jusers/kreuzer1/jureca/dask_jobqueue_logs//dask-worker-%J.err\n", + "#SBATCH -o /p/home/jusers/kreuzer1/jureca/dask_jobqueue_logs//dask-worker-%J.out\n", + "#SBATCH -p batch\n", + "#SBATCH -A cstvs\n", + "#SBATCH -n 1\n", + "#SBATCH --cpus-per-task=24\n", + "#SBATCH --mem=94G\n", + "#SBATCH -t 00:30:00\n", + "\n", + "JOB_ID=${SLURM_JOB_ID%;*}\n", + "\n", + "/usr/local/software/jureca/Stages/Devel-2019a/software/Python/3.6.8-GCCcore-8.3.0/bin/python -m distributed.cli.dask_worker tcp://10.80.32.31:35250 --nthreads 12 --nprocs 2 --memory-limit 50.00GB --name name --nanny --death-timeout 15s --local-directory /tmp --interface ib0\n", + "\n" + ] + } + ], + "source": [ + "print(cluster.job_script())" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "<table style=\"border: 2px solid white;\">\n", + "<tr>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Client</h3>\n", + "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", + " <li><b>Scheduler: </b>tcp://10.80.32.31:35250</li>\n", + " <li><b>Dashboard: </b><a href='http://10.80.32.31:56755/status' target='_blank'>http://10.80.32.31:56755/status</a>\n", + "</ul>\n", + "</td>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Cluster</h3>\n", + "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", + " <li><b>Workers: </b>0</li>\n", + " <li><b>Cores: </b>0</li>\n", + " <li><b>Memory: </b>0 B</li>\n", + "</ul>\n", + "</td>\n", + "</tr>\n", + "</table>" + ], + "text/plain": [ + "<Client: 'tcp://10.80.32.31:35250' processes=0 threads=0, memory=0 B>" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client = Client(cluster)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## You can visit the Dask Dashboard at the following url: \n", + "```\n", + "https://jupyter-jsc.fz-juelich.de/user/<user_name>/<lab_name>/proxy/<port>/status\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scale the cluster to two nodes\n", + "\n", + "A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs).\n", + "\n", + "For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs)." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "cluster.scale(4) # scale to 4 _workers_" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The Monte Carlo Method" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "import dask.array as da\n", + "import numpy as np\n", + "\n", + "\n", + "def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):\n", + " \"\"\"Calculate PI using a Monte Carlo estimate.\"\"\"\n", + "\n", + " size = int(size_in_bytes / 8)\n", + " chunksize = int(chunksize_in_bytes / 8)\n", + "\n", + " xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))\n", + "\n", + " in_circle = (xy ** 2).sum(axis=-1) < 1\n", + " pi = 4 * in_circle.mean()\n", + "\n", + " return pi\n", + "\n", + "\n", + "def print_pi_stats(size, pi, time_delta, num_workers):\n", + " \"\"\"Print pi, calculate offset from true value, and print some stats.\"\"\"\n", + " print(\n", + " f\"{size / 1e9} GB\\n\"\n", + " f\"\\tMC pi: {pi : 13.11f}\"\n", + " f\"\\tErr: {abs(pi - np.pi) : 10.3e}\\n\"\n", + " f\"\\tWorkers: {num_workers}\"\n", + " f\"\\t\\tTime: {time_delta : 7.3f}s\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The actual calculations\n", + "\n", + "We loop over different volumes of double-precision random numbers and estimate $\\pi$ as described above." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "from time import time, sleep" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1.0 GB\n", + "\tMC pi: 3.14163532800\tErr: 4.267e-05\n", + "\tWorkers: 4\t\tTime: 14.035s\n", + "10.0 GB\n", + "\tMC pi: 3.14159999360\tErr: 7.340e-06\n", + "\tWorkers: 4\t\tTime: 1.755s\n", + "100.0 GB\n", + "\tMC pi: 3.14160576512\tErr: 1.311e-05\n", + "\tWorkers: 4\t\tTime: 9.881s\n" + ] + } + ], + "source": [ + "for size in (1e9 * n for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Scaling the Cluster to twice its size\n", + "\n", + "We increase the number of workers by 2 and the re-run the experiments." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Scaling from 4 to 8 workers.\n" + ] + } + ], + "source": [ + "new_num_workers = 2 * len(cluster.scheduler.workers)\n", + "\n", + "print(f\"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.\")\n", + "\n", + "cluster.scale(new_num_workers)\n", + "\n", + "sleep(10)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "<table style=\"border: 2px solid white;\">\n", + "<tr>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Client</h3>\n", + "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", + " <li><b>Scheduler: </b>tcp://10.80.32.31:35250</li>\n", + " <li><b>Dashboard: </b><a href='http://10.80.32.31:56755/status' target='_blank'>http://10.80.32.31:56755/status</a>\n", + "</ul>\n", + "</td>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Cluster</h3>\n", + "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", + " <li><b>Workers: </b>4</li>\n", + " <li><b>Cores: </b>48</li>\n", + " <li><b>Memory: </b>200.00 GB</li>\n", + "</ul>\n", + "</td>\n", + "</tr>\n", + "</table>" + ], + "text/plain": [ + "<Client: 'tcp://10.80.32.31:35250' processes=4 threads=48, memory=200.00 GB>" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Re-run same experiments with doubled cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1.0 GB\n", + "\tMC pi: 3.14156992000\tErr: 2.273e-05\n", + "\tWorkers: 8\t\tTime: 0.863s\n", + "10.0 GB\n", + "\tMC pi: 3.14159750400\tErr: 4.850e-06\n", + "\tWorkers: 8\t\tTime: 2.203s\n", + "100.0 GB\n", + "\tMC pi: 3.14156159168\tErr: 3.106e-05\n", + "\tWorkers: 8\t\tTime: 5.774s\n" + ] + } + ], + "source": [ + "for size in (1e9 * n for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Automatically Scaling the Cluster\n", + "\n", + "We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.\n", + "\n", + "_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "ca = cluster.adapt(minimum=4, maximum=100)\n", + "\n", + "sleep(4) # Allow for scale-down" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "<table style=\"border: 2px solid white;\">\n", + "<tr>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Client</h3>\n", + "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", + " <li><b>Scheduler: </b>tcp://10.80.32.31:35250</li>\n", + " <li><b>Dashboard: </b><a href='http://10.80.32.31:56755/status' target='_blank'>http://10.80.32.31:56755/status</a>\n", + "</ul>\n", + "</td>\n", + "<td style=\"vertical-align: top; border: 0px solid white\">\n", + "<h3 style=\"text-align: left;\">Cluster</h3>\n", + "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", + " <li><b>Workers: </b>4</li>\n", + " <li><b>Cores: </b>48</li>\n", + " <li><b>Memory: </b>200.00 GB</li>\n", + "</ul>\n", + "</td>\n", + "</tr>\n", + "</table>" + ], + "text/plain": [ + "<Client: 'tcp://10.80.32.31:35250' processes=8 threads=96, memory=400.00 GB>" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Repeat the calculation from above with larger work loads\n", + "\n", + "(And watch the dash board!)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1.0 GB\n", + "\tMC pi: 3.14140582400\tErr: 1.868e-04\n", + "\tWorkers: 4\t\tTime: 2.682s\n", + "10.0 GB\n", + "\tMC pi: 3.14159994880\tErr: 7.295e-06\n", + "\tWorkers: 4\t\tTime: 3.225s\n", + "100.0 GB\n", + "\tMC pi: 3.14156351552\tErr: 2.914e-05\n", + "\tWorkers: 20\t\tTime: 31.772s\n" + ] + } + ], + "source": [ + "for size in (n * 1e9 for n in (1, 10, 100)):\n", + "\n", + " start = time()\n", + " pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()\n", + " elaps = time() - start\n", + "\n", + " print_pi_stats(\n", + " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", + " )\n", + "\n", + " sleep(20) # allow for scale-down time" + ] + } + ], + "metadata": { + "anaconda-cloud": {}, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/001-Extensions/MonteCarloJURON.ipynb b/001-Extensions/Dask_JURON.ipynb similarity index 100% rename from 001-Extensions/MonteCarloJURON.ipynb rename to 001-Extensions/Dask_JURON.ipynb