Select Git revision
plot_helper.py
-
Thomas Baumann authoredThomas Baumann authored
09_LocalTaskParallel.ipynb NaN GiB
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallel, Task-Based Computing with Load Balancing on your Local Machine"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In our first session [Interactive Parallel Computing on the Local Machine][LocalParallel], we used a direct view to access our engines. This is great as long as we want to do the same task on all engines and don't have many more tasks than engines. If we have many tasks, however, and don't care where each task is executed, the DirectView is not the most convenient view available.\n",
"\n",
"[LocalParallel]: LocalParallel.ipynb"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from ipyparallel import Client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rc = Client()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A *direct view* is created by slicing the client. A *load-balanced view* is created by calling rc's method `load_balanced_view()`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"lview = rc.load_balanced_view()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%px import numpy as np\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"n = 4096\n",
"A = np.random.random([n, n])\n",
"B = np.random.random([n, n])\n",
"C = np.dot(A, B)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tnp = %timeit -o A@B"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a00 = A[:n // 2, :n // 2]\n",
"a01 = A[:n // 2, n // 2:]\n",
"a10 = A[n // 2:, :n // 2]\n",
"a11 = A[n // 2:, n // 2:]\n",
"b00 = B[:n // 2, :n // 2]\n",
"b01 = B[:n // 2, n // 2:]\n",
"b10 = B[n // 2:, :n // 2]\n",
"b11 = B[n // 2:, n // 2:]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c00h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)\n",
"c01h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)\n",
"c10h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)\n",
"c11h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c00h.wait()\n",
"c01h.wait()\n",
"c10h.wait()\n",
"c11h.wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c00 = c00h.get()\n",
"c01 = c01h.get()\n",
"c10 = c10h.get()\n",
"c11 = c11h.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%timeit\n",
"c00h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b00, a01, b10)\n",
"c01h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a00, b01, a01, b11)\n",
"c10h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b00, a11, b10)\n",
"c11h = lview.apply(lambda a, b, c, d : np.dot(a, b) + np.dot(c, d), a10, b01, a11, b11)\n",
"c00h.wait()\n",
"c01h.wait()\n",
"c10h.wait()\n",
"c11h.wait()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Compare this time with the one from the [Interactive Parallel Computing on the Local Machine][LocalParallel] where we used a *direct view*. Is it better? Worse? About the same?\n",
"\n",
"[LocalParallel]: LocalParallel.ipynb#Matrix-Matrix-Multiplication-Using-a-DirectView"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It's probably about the same, so why would we use the *load-balanced view*? For starters, we can throw more tasks at our engines than there are workers. In the previous example, we split our matrices in four blocks. Let's write a function that takes a square matrix with n rows and columns, where n is multiple of threshold, that uses tiles of size threshold."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def BlockMatrixMultiply(A, B, threshold = 256):\n",
" \"\"\"Calculates the matrix product of two square matrices of size :math:`n^2` by dividing\n",
" matrices into smaller blocks.\n",
" \n",
" Parameters\n",
" ----------\n",
" A : ndarray\n",
" A square matrix of size n**2 where n is a power of 2 \n",
" B : ndarray\n",
" A square matrix of size n**2 where n is a power of 2\n",
" threshold: int\n",
" Size of blocks\n",
" \n",
" Returns\n",
" -------\n",
" output : ndarray\n",
" Returns the matrix product of A and B.\n",
" \"\"\"\n",
" \n",
" if threshold > A.shape[0]:\n",
" threshold = A.shape[0]\n",
" numberOfTiles = A.shape[0] // threshold\n",
" C = np.array([[np.sum([np.dot(A[i*threshold:(i+1)*threshold, k*threshold:(k+1)*threshold], \n",
" B[k*threshold:(k+1)*threshold, j*threshold:(j+1)*threshold]) \n",
" for k in range(numberOfTiles)], axis=0) # Add up all submatrices that belong to tile i,j\n",
" for j in range(numberOfTiles)] # Loop over columns of result matrix\n",
" for i in range(numberOfTiles)]) # Loop over rows of result matrix\n",
"\n",
" return C.swapaxes(1,2).reshape(A.shape)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"C1 = BlockMatrixMultiply(A, B, n // 2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"np.allclose(C, C1) # Tests is the difference of all array elements is less than some threshold. Use np.allclose? to get details."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%timeit C=np.dot(A,B)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%timeit BlockMatrixMultiply(A, B, n // 2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def BlockMatrixMultiplyLB(A, B, lview, threshold = 256):\n",
" \"\"\"Calculates the matrix product of two square matrices of size :math:`n^2` by dividing\n",
" matrices into smaller blocks.\n",
" \n",
" \n",
" Parameters\n",
" ----------\n",
" A : ndarray\n",
" A square matrix of size n**2 where n is a power of 2\n",
" B : ndarray\n",
" A square matrix of size n**2 where n is a power of 2\n",
" threshold: int\n",
" Size of blocks\n",
" view:\n",
" An IPython parallel view\n",
" \n",
" Returns\n",
" -------\n",
" output : ndarray\n",
" Returns the matrix product of A and B.\n",
" \"\"\"\n",
" if threshold > A.shape[0]:\n",
" threshold = A.shape[0]\n",
" n = A.shape[0] // threshold\n",
" Ch = [ [lview.apply(lambda a, b, threshold, n, i, j : \n",
" np.sum([np.dot(a[:, k*threshold:(k+1)*threshold], \n",
" b[k*threshold:(k+1)*threshold,:]) \n",
" for k in range(n)], axis=0), # Add up all the matrices that belong to tile i,j\n",
" A[i*threshold:(i+1)*threshold,:], B[:,j*threshold:(j+1)*threshold], threshold, n, i, j) # Arguments to lambda\n",
" for j in range(n)] # Loop over columns of result matrix\n",
" for i in range(n)] # Loop over rows of result matrix\n",
"\n",
" #lview.wait() # Let's finish all the work that has been started in this view\n",
" \n",
" # Instead of waiting for the view, we can wait for all our asyncs to finish:\n",
" for r in Ch:\n",
" for c in r:\n",
" c.wait()\n",
" \n",
" return np.array([[c.get() for c in r] for r in Ch]).swapaxes(1,2).reshape(A.shape)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"C2 = BlockMatrixMultiplyLB(A, B, lview, n // 4) # Creates 16 tasks"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"np.allclose(C, C2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%timeit BlockMatrixMultiplyLB(A, B, lview, n)\n",
"%timeit BlockMatrixMultiplyLB(A, B, lview, n // 2) # 4 tasks\n",
"%timeit BlockMatrixMultiplyLB(A, B, lview, n // 4) # 16 tasks\n",
"%timeit BlockMatrixMultiplyLB(A, B, lview, n // 8) # 64 tasks"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"BlockMatrixMultiply?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "HPC Python 2021",
"language": "python",
"name": "hpcpy21"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}