# Parallel, Task-Based Computing with Load Balancing on your Local Machine

In our first session [Interactive Parallel Computing on the Local Machine][LocalParallel], we used a direct view to access our engines. This is great as long as we want to do the same task on all engines and don't have many more tasks than engines. If we have many tasks, however, and don't care where each task is executed, the DirectView is not the most convenient view available.

[LocalParallel]: LocalParallel.ipynb

In [None]:
from ipyparallel import Client

In [None]:
rc = Client()

A *direct view* is created by slicing the client. A *load-balanced view* is created by calling rc's method `load_balanced_view()`.

In [None]:
lview = rc.load_balanced_view()

In [None]:
%px import numpy as np
import numpy as np

In [None]:
n = 4096
A = np.random.random([n, n])
B = np.random.random([n, n])
C = np.dot(A, B)

In [None]:
tnp = %timeit -o A@B

In [None]:
a00 = A[:n // 2, :n // 2]
a01 = A[:n // 2, n // 2:]
a10 = A[n // 2:, :n // 2]
a11 = A[n // 2:, n // 2:]
b00 = B[:n // 2, :n // 2]
b01 = B[:n // 2, n // 2:]
b10 = B[n // 2:, :n // 2]
b11 = B[n // 2:, n // 2:]

In [None]:
c00h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)
c01h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)
c10h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)
c11h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)

In [None]:
c00h.wait()
c01h.wait()
c10h.wait()
c11h.wait()

In [None]:
c00 = c00h.get()
c01 = c01h.get()
c10 = c10h.get()
c11 = c11h.get()

In [None]:
%%timeit
c00h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)
c01h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)
c10h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)
c11h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)
c00h.wait()
c01h.wait()
c10h.wait()
c11h.wait()

Compare this time with the one from the [Interactive Parallel Computing on the Local Machine][LocalParallel] where we used a *direct view*. Is it better? Worse? About the same?

[LocalParallel]: LocalParallel.ipynb#Matrix-Matrix-Multiplication-Using-a-DirectView

It's probably about the same, so why would we use the *load-balanced view*? For starters, we can throw more tasks at our engines than there are workers. In the previous example, we split our matrices in four blocks. Let's write a function that takes a square matrix with n rows and columns, where n is multiple of threshold, that uses tiles of size threshold.

In [None]:
def BlockMatrixMultiply(A, B, threshold = 256):
    """Calculates the matrix product of two square matrices of size :math:`n^2` by dividing
    matrices into smaller blocks.
    
    Parameters
    ----------
    A : ndarray
        A square matrix of size n**2 where n is a power of 2    
    B : ndarray
        A square matrix of size n**2 where n is a power of 2
    threshold: int
        Size of blocks
        
    Returns
    -------
    output : ndarray
        Returns the matrix product of A and B.
    """
    
    if threshold > A.shape[0]:
        threshold = A.shape[0]
    numberOfTiles = A.shape[0] // threshold
    C = np.array([[np.sum([np.dot(A[i*threshold:(i+1)*threshold, k*threshold:(k+1)*threshold], 
                                  B[k*threshold:(k+1)*threshold, j*threshold:(j+1)*threshold]) 
                           for k in range(numberOfTiles)], axis=0) # Add up all submatrices that belong to tile i,j
                       for j in range(numberOfTiles)] # Loop over columns of result matrix
                  for i in range(numberOfTiles)]) # Loop over rows of result matrix

    return C.swapaxes(1,2).reshape(A.shape)

In [None]:
C1 = BlockMatrixMultiply(A, B, n // 2)

In [None]:
np.allclose(C, C1) # Tests is the difference of all array elements is less than some threshold. Use np.allclose? to get details.

In [None]:
%timeit C=np.dot(A,B)

In [None]:
%timeit BlockMatrixMultiply(A, B, n // 2)

In [None]:
def BlockMatrixMultiplyLB(A, B, lview, threshold = 256):
    """Calculates the matrix product of two square matrices of size :math:`n^2` by dividing
    matrices into smaller blocks.
    
    
    Parameters
    ----------
    A : ndarray
        A square matrix of size n**2 where n is a power of 2
    B : ndarray
        A square matrix of size n**2 where n is a power of 2
    threshold: int
        Size of blocks
    view:
        An IPython parallel view
        
    Returns
    -------
    output : ndarray
        Returns the matrix product of A and B.
    """
    if threshold > A.shape[0]:
        threshold = A.shape[0]
    n = A.shape[0] // threshold
    Ch = [ [lview.apply(lambda a, b, threshold, n, i, j : 
               np.sum([np.dot(a[:, k*threshold:(k+1)*threshold], 
                              b[k*threshold:(k+1)*threshold,:]) 
                       for k in range(n)], axis=0), # Add up all the matrices that belong to tile i,j
               A[i*threshold:(i+1)*threshold,:], B[:,j*threshold:(j+1)*threshold], threshold, n, i, j) # Arguments to lambda
           for j in range(n)] # Loop over columns of result matrix
         for i in range(n)] # Loop over rows of result matrix

    #lview.wait() # Let's finish all the work that has been started in this view
    
    # Instead of waiting for the view, we can wait for all our asyncs to finish:
    for r in Ch:
        for c in r:
            c.wait()
            
    return np.array([[c.get() for c in r] for r in Ch]).swapaxes(1,2).reshape(A.shape)

In [None]:
C2 = BlockMatrixMultiplyLB(A, B, lview, n // 4) # Creates 16 tasks

In [None]:
np.allclose(C, C2)

In [None]:
%timeit BlockMatrixMultiplyLB(A, B, lview, n)
%timeit BlockMatrixMultiplyLB(A, B, lview, n // 2) #  4 tasks
%timeit BlockMatrixMultiplyLB(A, B, lview, n // 4) # 16 tasks
%timeit BlockMatrixMultiplyLB(A, B, lview, n // 8) # 64 tasks

In [None]:
BlockMatrixMultiply?