# Interactive Parallel Computing with IPython Parallel

<div class="dateauthor">
21 June 2022 | Jan H. Meinke
</div>

*Computers have more than one core.* Wouldn't it be nice if we could use all the cores of our local machine or a compute node of a cluster from our [Jupyter][IP] notebook? 

Click on the ``+``-sign at the top of the Files tab on the left to start a new launcher. In the launcher click on Terminal. A terminal will open as a new tab. Grab the tab and pull it to the right to have the terminal next to your notebook.

**Note**: The terminal does not have the same modules loaded as the notebook. To fix that type `source $PROJECT_training2219/hpcpy22`.

In the terminal type ``ipcluster``. You'll see the help message telling you that you need to give it subcommand. Take a look at the message and then enter 

``` bash
export OMP_NUM_THREADS=32
ipcluster start --n=4
```

This will start a cluster with four engines and should limit the number of threads to 32 threads per engine to avoid oversubscription.

> If you use the classical [Jupyter][IP] notebook, this is even easier if you have the cluster extension installed. (We don't have that one on our JupyterHub, yet). One of the tabs of your browser has the title "Home". If you switch to that tab, there are several tabs within the web page. One of them is called "IPython Clusters". Click on "IPython Clusters", increase the number of engines in the "default" profile to 4, and click on Start. The status changes from stopped to running. After you did that come back to this tab.

>If the "Clusters" tab shows the message:

>>    Clusters tab is now provided by IPython parallel. See IPython parallel for installation details.
    
> you need to quit your notebook server (make sure all your notebooks ar saved) and run the command 

>>    ipcluster nbextension enable
    
>Now, when you start `jupyter notebook` you should see a field that lets you set the number of engines in the "IPython Clusters" tab.




[IP]: http://www.jupyter.org

## Overview

IPyParallel has three parts: controller, engines, and client. The controller is the central hub. The client communicates only with the controller. The controller keeps track of the available engines and forwards requests from the client to the engines. It schedules the work and monitors its status. The results are communicated through the controller back to the client.

All three components can run on different computers. A Jupyter notebook might run on your laptop and connect to an ipcontroller on a JUWELS login node, which in turn talks to engines running on a compute node.

![IPython Parallel Architecture](images/ipyparallel.svg)

## The Client

Now let's see how we access the "Cluster". [IPython][IP] comes with a module [ipyparallel][IPp] that is used to access the engines, we just started. We first need to import Client.

[IPp]: https://ipyparallel.readthedocs.io/en/latest/
[IP]: http://www.ipython.org

In [None]:
from ipyparallel import Client

In [None]:
rc = Client(profile="default")

We can list the ids of the engines attached

In [None]:
rc.ids

## Views

A *view* gives us access to a set of engines using a given scheduler. There are two types of views: a *direct view* and a *load-balanced* view.

As the name implies a *direct view* gives us direct control of the engines. We can push and pull data and apply functions using a couple of different methods. We are in control what runs where.

A *load-balanced view* tries to balance the work between all the engines. We can submit tasks to it in the same way as before, but with a *load-balanced view*, the scheduler decides where a function is executed. It's also possible to define dependencies between tasks to build a dependency graph or even build this graph by hand. You'll learn a little more about dependencies in [Parallel, Task-Based Computing with Load Balancing on your Local Machine][LocalTaskParallel]

Let's start with a *direct view* and learn about the methods used to execute code on the engines and move data around. 

We create a *direct view* of the engines by slicing the Client object:

[LocalTaskParallel]: LocalTaskParallel.ipynb

In [None]:
v01 = rc[0:2] # First two engines (0 and 1)
v23 = rc[2:4] # Engines 2 and 3
dview = rc[:] # All available engines

## Parallel Magic

Before we go into the details of the interface of a `DirectView`--that's the name of the class, let's look at IPython magic.

IPython makes it very easy to use IPyParallel. It provides the magic commands ``%px`` and ``%%px`` to execute code in parallel. The target attribute is used to pick the engines, you want. By default, all the engines of the last Client object created are used. You can also specify if a command should be executed `blocking`--the default--or `non-blocking`.

Note, the commands prefixed with ``%px`` are *not* executed locally. 

In [None]:
%px import numpy as np # import numpy on all engines as np
import numpy as np # do it locally, too.

Since it's fairly common that you want to execute a cell remotely and locally, there's an option for that. Just add ``--local``.

**Note**: This works only for ``%%px`` not ``%px``.

In [None]:
%%px --local 
np.__version__ # print the numpy version of the engines. Note how the output is prefixed. It can be accessed that way, too. 

 The engines run ipython. Magic commands work, too.

In [None]:
%%px --local
%matplotlib inline

In [None]:
%%px --local 
import matplotlib.pyplot as plt


The cell magic command ``%%px`` lets us execute more than one statement. The option ``--target`` lets us choose which engines we want to use. Here we are using engines 0 to 3.

In [None]:
%%px --target 0:4
a = np.random.random([10,10])
plt.imshow(a, interpolation="nearest")

Yes, the output can be graphical.

Remember that the imports, we performed with ``%px`` are not available in our notebook. We can fix that by using

In [None]:
with rc[:].sync_imports():
    import matplotlib.pyplot

Unfortunately mapping of namespaces does not work that way.

## Using the Direct View

As mentioned above a *direct view* lets you control each engine directly. You can also decide if a command should be blocking or not.

We can, for example, create two random 100 by 100 element matrices on each engine, multiply them, and then display them. On each engine the code would look like this

In [None]:
a = np.random.random([100, 100])
b = np.random.random([100, 100])
c = a.dot(b)
plt.imshow(c, interpolation="nearest")

As we learned before, we can use the ``%%px`` cell magic to execute this on all engines. Here we use the ``--target`` option to specify every second engine starting at 0. ``%px`` and ``%%px`` use the currently active view. By default that's the first view created. You can make a view active by calling ``view.activate(suffix)``. Use ``view.activate?`` to learn more about suffix.

In [None]:
%%px --target 0::2
a = np.random.random([100, 100])
b = np.random.random([100, 100])
c = a.dot(b)
plt.imshow(c, interpolation="nearest")

Magic commands are blocking by default, i.e., the next cell can only be executed after all the engines have finished their work. We can pass the option ``--noblock`` to change that behavior.

In [None]:
%%px
import threadpoolctl
threadpoolctl.threadpool_limits(limits=32, user_api='blas')

In [None]:
%%px --noblock
a = np.random.random([2000, 2000])
b = np.random.random([2000, 2000])
c = a.dot(b)
c.sum() / 4.0e9

We get an AsyncResult back. We can continue working in our notebook and pick up the result, when we are ready to do so.

In [None]:
%result

## Execute and Apply

The foundation of executing code with a ``DirectView`` is ``apply``. It calls a function (the first argument) with args and kwargs. The values of the arguments are taken from the notebook and pushed to the engines.

In [None]:
c = dview.apply(lambda a,b: np.dot(a,b), a, b) # This uses a and b from the notebook

In [None]:
c.done()

In [None]:
c = c.result()

In [None]:
c[0].shape

The function can be a lambda function:

In [None]:
c = dview.apply(lambda a,b : a + b, a, b)
c.done()
c = c.result()

In [None]:
c[0].shape

It can also be ``exec``:

In [None]:
dview.apply(exec, 'c = a + b') # Note, this uses the variables defined on the engines.

In [None]:
dview['c'][0].shape

In [None]:
dview.execute('c=a+b')

In [None]:
dview['c']

## Remote functions

There are two decorators ``@parallel`` and ``@remote`` that can create functions that are executed on the engines.

A function decorated with ``@parallel`` takes a sequence or an array as argument and distributes the work over the engines. Each engine still gets a sequence or array and should return one, too.

In [None]:
from ipyparallel import parallel, remote

In [None]:
@dview.parallel(block=True)
def even(x):
    """Return only even elements of x
    
    Paramters
    ---------
    x : sequence or array
        A list of values
    
    Returns
    -------
    res : like x
        even elements of x
    """
    return [e for e in x if not e % 2]
#    return None if x % 2 else x
    

In [None]:
even(list(range(0,16)))

A `remote` function, on the other hand just runs on each engine with the full set of data.

In [None]:
@dview.remote(block=True)
def scale(a):
    for i in range(len(a)):
        a[i] *= 2
        
    return a

In [None]:
scale(list(range(0, 16)))

## Moving data around

So far the runtime has taken care of moving data to and from the engines, but we can do this explicitely. There are 4 commands to do that:

* push
* pull
* scatter
* gather

Push takes a dictionary with the remote variable name as key:

In [None]:
dview.block=True

In [None]:
localA = list(range(10))
dview.push(dict(remoteA=localA))

We can get a variable back with pull. 

In [None]:
dview.pull('remoteA')

There's also a shorthand notation, where we treat the view as a dictionary.

In [None]:
dview['remoteA']

The methods ``push`` and ``pull`` push/pull the same data to/from all engines. They don't take a list and distribute it. That's what ``scatter`` and ``gather`` do. 

In [None]:
dview.scatter('a',list(range(24)))
dview['a']

In [None]:
dview.gather('a')

## List comprehension

With those methods at hand, we can build a parallel list comprehension.

In [None]:
dview.scatter('x',list(range(64)))

In [None]:
%px y = [i**10 for i in x]

In [None]:
y = dview.gather('y')
y

## Exploring Latency and Bandwidth

Latency (the time until something happens) and bandwidth (the amount of data we get through the network) are two important properties of your parallel system that define what is practical and what is not. We will use the ``%timeit`` magic to measure these properties. ``%timit`` and its sibbling ``%%timeit`` measure the run time of a statement (cell in the case of ``%%timeit``) by executing the statement multiple times (by default at least 3 times). For short running routines many loops of 3 executions are performed and the minimum time measured is then displayed. The number of loops and the number of executions can be adjusted. Take a look at the documentation. Give it a try.

Lets first see how long it takes to send off a new task using ``execute`` and ``apply``.

In [None]:
dview.block = False

In [None]:
%%px --noblock --local
a = np.random.random([2000, 2000])
b = np.random.random([2000, 2000])
c = a.dot(b)
c.sum() / 4.0e9

Let's first execute nothing.

In [None]:
%timeit dview.execute('')

Next we'll use a very minimal function. It just returns its argument. In this case the argument is empty.

In [None]:
%timeit dview.apply(lambda x : x, '')

Here, we'll tell every engine to perform a matrix-matrix multiplication (see [Matrix-Matrix Multiplication Using a DirectView](Matrix-Matrix-Multiplication-Using-a-DirectView) below for more about matrix multiplications)

In [None]:
%timeit -n 1 -r 4 dview.execute('c = a.dot(b)')

Now, we'll make the execution blocking. This means, we are measuring the time the function needs to return a result instead of just the time needed to launch the task.

In [None]:
dview.block=True

In [None]:
%timeit dview.execute('')

In [None]:
%timeit dview.apply(lambda x : x, '')

In [None]:
%timeit -n 1 -r 4 rc[0].execute('c = a.dot(b)')

In [None]:
%timeit a.dot(b)

In [None]:
dview.block=False

We can start about 500 parallel tasks per second and finish about a quarter as many. This gives an estimate of the granularity we need to use this model for efficient parallelization. Any task that takes less time than this will be dominated by the overhead.

To get an idea about the bandwidth available let's push some arrays to the engines. We make this blocking.

In [None]:
dview.block=True

In [None]:
a = np.random.random(256*1024)

In [None]:
%timeit dview.push(dict(a=a))
%timeit dview.push(dict(a=a[:128*1024]))
%timeit dview.push(dict(a=a[:64*1024]))
%timeit dview.push(dict(a=a[:32*1024]))
%timeit dview.push(dict(a=a[:16*1024]))
%timeit dview.push(dict(a=a[:8*1024]))
%timeit dview.push(dict(a=a[:4*1024]))
%timeit dview.push(dict(a=a[:2*1024]))
%timeit dview.push(dict(a=a[:1024]))

Calculate the bandwidth for the largest array and the smallest array.

In [None]:
bwmax = len(rc) * 256 * 8 / 9.8e-3
bwmin = len(rc) * 8 / 6.1e-3
print("The bandwidth is between %.2f kB/s and %.2f kB/s." %( bwmin, bwmax))

## Matrix-Matrix Multiplication Using a DirectView

Matrix multiplication is one of the favorites in High Performance Computing (HPC). It's computationally intensive---if done right---, easily parallelized with little communication, and the basis of many real world applications.

Let's say, we have two matrices A and B, where

$$ A = \left ( \begin{array}{cccc}
                4 & 3 & 1 & 6 \\
                1 & 2 & 0 & 3 \\
                7 & 9 & 2 & 0 \\
                2 & 2 & -1 & 4 \\
               \end{array}
       \right ) $$

and 

$$ B = \left ( \begin{array}{cc}
                \frac{1}{12} & \frac{1}{2} \\
                \frac{1}{9}  & \frac{1}{4} \\
                \frac{1}{3}  &      1      \\
                \frac{1}{7}  & -\frac{1}{3}
                \end{array}
       \right ). $$

To calculate the element of $C = A B$ at row *i* and column *j*, we perform a dot (scalar) product of the ith row of A and the jth column of B:

$$ C_{ij} = \sum_k A_{i,k} B_{k, j} $$.

For this to work, the number of columns in $A$ has to be equal to the number of rows in $B$.

We can generate two matrices of size n by n filled with random numbers using ``np.random.random``.

In [None]:
n = 16
A = np.random.random([n, n])
B = np.random.random([n, n])

NumPy includes the dot product. For 2-dimensional arrays ``np.dot`` performs a matrix-matrix multiplication.

In [None]:
C = np.dot(A, B)

In [None]:
%timeit np.dot(A, B)

There are different ways to parallelize a matrix-matrix multiplication. Each element of the matrix can be calculated independently.

In [None]:
%%timeit  
p = len(rc)
# Distribute the elements of the result viewmatrix round robin.
C1h = [[rc[(i * n + j) % p].apply(np.dot, A[i,:], B[:,j]) for j in range(n)] for i in range(n)]
# Wait until the calculation is done
dview.wait()


This, however, produces $n^2$ short tasks and the overhead (latency) is just overwhelming.

We want to calculate

$$ C = \left ( \begin{array}{cccc}
                4 & 3 & 1 & 6 \\
                1 & 2 & 0 & 3 \\
                7 & 9 & 2 & 0 \\
                2 & 2 & -1 & 4 \\
               \end{array}
       \right ) 
              \left ( \begin{array}{cc}
                \frac{1}{12} & \frac{1}{2} \\
                \frac{1}{9}  & \frac{1}{4} \\
                \frac{1}{3}  &      1      \\
                \frac{1}{7}  & -\frac{1}{3}
                \end{array}
       \right ). 
$$

We can split the matrices into tiles. In the above example, we might use a 2 by 2 tile.

$$ C = \left ( \begin{array} {cc}
               a_{00} & a_{01} \\
               a_{10} & a_{11}
               \end{array} \right )
       \left ( \begin{array} {c}
               b_{00} \\
               b_{10}
               \end{array} \right )
     = \left ( \begin{array} {c}
               a_{00} b_{00} + a_{01} b_{10} \\
               a_{10} b_{00} + a_{11} b_{10}
               \end{array} \right )
               ,
$$

where, for example, $a_{00}= \left ( \begin{array}{cc} 4 & 3 \\ 1 & 2 \end{array} \right )$. $a_{00}b_{00}$ is a matrix-matrix product and the addition of two matrices of the same shape is defined element by element.

In our example, we have two $n$ by $n$ matrices and we are going to split them in quadrants.

In [None]:
n = 4096
A = np.random.random([n, n])
B = np.random.random([n, n])

In [None]:
tdot = %timeit -o np.dot(A,B)

In [None]:
type(n // 2)

In [None]:
nhalf = n // 2
a00 = A[:nhalf, :nhalf]
a01 = A[:nhalf, nhalf:]
a10 = A[nhalf:, :nhalf]
a11 = A[nhalf:, nhalf:]
b00 = B[:nhalf, :nhalf]
b01 = B[:nhalf, nhalf:]
b10 = B[nhalf:, :nhalf]
b11 = B[nhalf:, nhalf:]

The calculation of the partial results in Python looks very similar to the mathematical description above:

In [None]:
c00 = np.dot(a00, b00) + np.dot(a01, b10)
c01 = np.dot(a00, b01) + np.dot(a01, b11)
c10 = np.dot(a10, b00) + np.dot(a11, b10)
c11 = np.dot(a10, b01) + np.dot(a11, b11)

In [None]:
%%timeit -o
c00 = np.dot(a00, b00) + np.dot(a01, b10)
c01 = np.dot(a00, b01) + np.dot(a01, b11)
c10 = np.dot(a10, b00) + np.dot(a11, b10)
c11 = np.dot(a10, b01) + np.dot(a11, b11)

In [None]:
_.best / tdot.best

Hm, this is slower than doing it directly...

Next we create one view per engine.

In [None]:
d0 = rc[0]
d1 = rc[1]
d2 = rc[2]
d3 = rc[3]

In [None]:
%timeit d0.apply(lambda A, B : np.dot(A, B), A, B).wait()

In [None]:
c00h = d0.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)
c01h = d1.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)
c10h = d2.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)
c11h = d3.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)

In [None]:
c00h.wait()
c01h.wait()
c10h.wait()
c11h.wait()

In [None]:
c00 = c00h.get()
c01 = c01h.get()
c10 = c10h.get()
c11 = c11h.get()

In [None]:
%%timeit
c00h = d0.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)
c01h = d1.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)
c10h = d2.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)
c11h = d3.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)
c00h.wait()
c01h.wait()
c10h.wait()
c11h.wait()

Nothing says, we have to stop at 4 tiles nor do we have to use square tiles. We could also recursively subdivide our tiles.

The code is not any faster, because our implementation of numpy already blocks the matrices and uses all cores, but it shows the principle.