From e1965bc6d99d2654266a662f085a746f06bc9f09 Mon Sep 17 00:00:00 2001
From: Fahad Khalid <f.khalid@fz-juelich.de>
Date: Sun, 17 Nov 2019 16:44:33 +0100
Subject: [PATCH] Added the data distributed MNIST example, along with the
 partitioned MNIST dataset.

---
 .gitattributes                           |   4 +
 .gitignore                               |   2 -
 datasets/mnist/partitioned/test/x/0.npy  |   3 +
 datasets/mnist/partitioned/test/x/1.npy  |   3 +
 datasets/mnist/partitioned/test/x/2.npy  |   3 +
 datasets/mnist/partitioned/test/x/3.npy  |   3 +
 datasets/mnist/partitioned/test/x/4.npy  |   3 +
 datasets/mnist/partitioned/test/x/5.npy  |   3 +
 datasets/mnist/partitioned/test/x/6.npy  |   3 +
 datasets/mnist/partitioned/test/x/7.npy  |   3 +
 datasets/mnist/partitioned/test/y/0.npy  |   3 +
 datasets/mnist/partitioned/test/y/1.npy  |   3 +
 datasets/mnist/partitioned/test/y/2.npy  |   3 +
 datasets/mnist/partitioned/test/y/3.npy  |   3 +
 datasets/mnist/partitioned/test/y/4.npy  |   3 +
 datasets/mnist/partitioned/test/y/5.npy  |   3 +
 datasets/mnist/partitioned/test/y/6.npy  |   3 +
 datasets/mnist/partitioned/test/y/7.npy  |   3 +
 datasets/mnist/partitioned/train/x/0.npy |   3 +
 datasets/mnist/partitioned/train/x/1.npy |   3 +
 datasets/mnist/partitioned/train/x/2.npy |   3 +
 datasets/mnist/partitioned/train/x/3.npy |   3 +
 datasets/mnist/partitioned/train/x/4.npy |   3 +
 datasets/mnist/partitioned/train/x/5.npy |   3 +
 datasets/mnist/partitioned/train/x/6.npy |   3 +
 datasets/mnist/partitioned/train/x/7.npy |   3 +
 datasets/mnist/partitioned/train/y/0.npy |   3 +
 datasets/mnist/partitioned/train/y/1.npy |   3 +
 datasets/mnist/partitioned/train/y/2.npy |   3 +
 datasets/mnist/partitioned/train/y/3.npy |   3 +
 datasets/mnist/partitioned/train/y/4.npy |   3 +
 datasets/mnist/partitioned/train/y/5.npy |   3 +
 datasets/mnist/partitioned/train/y/6.npy |   3 +
 datasets/mnist/partitioned/train/y/7.npy |   3 +
 horovod/keras/.run_mnist_data_dist       |   3 +
 horovod/keras/mnist_data_distributed.py  | 216 +++++++++++++++++++++++
 36 files changed, 319 insertions(+), 2 deletions(-)
 create mode 100644 datasets/mnist/partitioned/test/x/0.npy
 create mode 100644 datasets/mnist/partitioned/test/x/1.npy
 create mode 100644 datasets/mnist/partitioned/test/x/2.npy
 create mode 100644 datasets/mnist/partitioned/test/x/3.npy
 create mode 100644 datasets/mnist/partitioned/test/x/4.npy
 create mode 100644 datasets/mnist/partitioned/test/x/5.npy
 create mode 100644 datasets/mnist/partitioned/test/x/6.npy
 create mode 100644 datasets/mnist/partitioned/test/x/7.npy
 create mode 100644 datasets/mnist/partitioned/test/y/0.npy
 create mode 100644 datasets/mnist/partitioned/test/y/1.npy
 create mode 100644 datasets/mnist/partitioned/test/y/2.npy
 create mode 100644 datasets/mnist/partitioned/test/y/3.npy
 create mode 100644 datasets/mnist/partitioned/test/y/4.npy
 create mode 100644 datasets/mnist/partitioned/test/y/5.npy
 create mode 100644 datasets/mnist/partitioned/test/y/6.npy
 create mode 100644 datasets/mnist/partitioned/test/y/7.npy
 create mode 100644 datasets/mnist/partitioned/train/x/0.npy
 create mode 100644 datasets/mnist/partitioned/train/x/1.npy
 create mode 100644 datasets/mnist/partitioned/train/x/2.npy
 create mode 100644 datasets/mnist/partitioned/train/x/3.npy
 create mode 100644 datasets/mnist/partitioned/train/x/4.npy
 create mode 100644 datasets/mnist/partitioned/train/x/5.npy
 create mode 100644 datasets/mnist/partitioned/train/x/6.npy
 create mode 100644 datasets/mnist/partitioned/train/x/7.npy
 create mode 100644 datasets/mnist/partitioned/train/y/0.npy
 create mode 100644 datasets/mnist/partitioned/train/y/1.npy
 create mode 100644 datasets/mnist/partitioned/train/y/2.npy
 create mode 100644 datasets/mnist/partitioned/train/y/3.npy
 create mode 100644 datasets/mnist/partitioned/train/y/4.npy
 create mode 100644 datasets/mnist/partitioned/train/y/5.npy
 create mode 100644 datasets/mnist/partitioned/train/y/6.npy
 create mode 100644 datasets/mnist/partitioned/train/y/7.npy
 create mode 100755 horovod/keras/.run_mnist_data_dist
 create mode 100644 horovod/keras/mnist_data_distributed.py

diff --git a/.gitattributes b/.gitattributes
index 775c8fe..dbf6f0e 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -9,3 +9,7 @@ datasets/mnist/raw/t10k-images-idx3-ubyte.gz filter=lfs diff=lfs merge=lfs -text
 datasets/mnist/raw/t10k-labels-idx1-ubyte.gz filter=lfs diff=lfs merge=lfs -text
 datasets/mnist/raw/train-images-idx3-ubyte.gz filter=lfs diff=lfs merge=lfs -text
 datasets/mnist/raw/train-labels-idx1-ubyte.gz filter=lfs diff=lfs merge=lfs -text
+datasets/mnist/partitioned/train/x/*.npy filter=lfs diff=lfs merge=lfs -text
+datasets/mnist/partitioned/train/y/*.npy filter=lfs diff=lfs merge=lfs -text
+datasets/mnist/partitioned/test/x/*.npy filter=lfs diff=lfs merge=lfs -text
+datasets/mnist/partitioned/test/y/*.npy filter=lfs diff=lfs merge=lfs -text
diff --git a/.gitignore b/.gitignore
index 340d044..9c4d6d5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -118,5 +118,3 @@ mnist_convnet_model/
 # Error and output files from the supercomputers
 *.er
 *.out
-
-horovod/keras/mnist_data_distributed.py
diff --git a/datasets/mnist/partitioned/test/x/0.npy b/datasets/mnist/partitioned/test/x/0.npy
new file mode 100644
index 0000000..23c2d04
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/0.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:d337b85aa5761f401dd8ef5485ec8365a0254febdf02dc75abd97e852e46672b
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/1.npy b/datasets/mnist/partitioned/test/x/1.npy
new file mode 100644
index 0000000..a24cb47
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/1.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:6b13a84dd3c642af5a36cd64deda794dec5afe57985db5ae1ef70b2ab9c2c3da
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/2.npy b/datasets/mnist/partitioned/test/x/2.npy
new file mode 100644
index 0000000..a4261b3
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/2.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:ce66171e3c983e0a7ac3bfd1e526fa311e1b635d24f3aaffbe9409037e369d88
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/3.npy b/datasets/mnist/partitioned/test/x/3.npy
new file mode 100644
index 0000000..726e572
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/3.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:dfbc845e3668427f859c6dd687e1c440ede252bdb3f51349bb0250a7fc920c6a
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/4.npy b/datasets/mnist/partitioned/test/x/4.npy
new file mode 100644
index 0000000..47dc5ed
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/4.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:3842852bf7abd261b394b32432c97c922c2d4e566ae8a824bec762e96576f974
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/5.npy b/datasets/mnist/partitioned/test/x/5.npy
new file mode 100644
index 0000000..6c18938
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/5.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:56c0f2d2e2e6b4aae57a7f7bfd9cf4a35c81640095ca96c00f0ba6ea487dccb5
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/6.npy b/datasets/mnist/partitioned/test/x/6.npy
new file mode 100644
index 0000000..2e08250
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/6.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:2abcd772fa7008092d33452de27d91ee6865f4eedf876e5ad07265a3ec6a33ee
+size 980128
diff --git a/datasets/mnist/partitioned/test/x/7.npy b/datasets/mnist/partitioned/test/x/7.npy
new file mode 100644
index 0000000..7c3fea6
--- /dev/null
+++ b/datasets/mnist/partitioned/test/x/7.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:b7241c5639af8ad3c0197f8f6da20a6a6bea510e5fe8dc340e44a270f6f1efae
+size 980128
diff --git a/datasets/mnist/partitioned/test/y/0.npy b/datasets/mnist/partitioned/test/y/0.npy
new file mode 100644
index 0000000..e73ef7c
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/0.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:292a20ed0440011df9e0057018d8b0e2712963f206b2356f72294dc56d2cd305
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/1.npy b/datasets/mnist/partitioned/test/y/1.npy
new file mode 100644
index 0000000..1bdbf1b
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/1.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:003428c745e9bf48a3c297dfce0465a2524557a13f4e8b9cc0c3c52c85b87ce0
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/2.npy b/datasets/mnist/partitioned/test/y/2.npy
new file mode 100644
index 0000000..53ae68c
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/2.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:d836ac22edbec15473d650cbe049473cc287d31d2b5e229a0b6ddb66edd20057
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/3.npy b/datasets/mnist/partitioned/test/y/3.npy
new file mode 100644
index 0000000..fe6e890
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/3.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:3074d0129a4571fd780fc94e17925122408aed7fcb5bb5425be00c2c26c7b1e1
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/4.npy b/datasets/mnist/partitioned/test/y/4.npy
new file mode 100644
index 0000000..21cdec7
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/4.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:16323c30f940623002003fd2ee96029b6c2b1fbb09583408499ada4748a0c537
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/5.npy b/datasets/mnist/partitioned/test/y/5.npy
new file mode 100644
index 0000000..01f1a07
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/5.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:381c928395e7b1e17d537d4f4d42d752d18e5371a511a69d2038ab4f8d828aa3
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/6.npy b/datasets/mnist/partitioned/test/y/6.npy
new file mode 100644
index 0000000..9d5e67f
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/6.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:6b6692f8f3e1623f9db7f1f0badda307d121538090418b83ba32a04b666809dc
+size 1378
diff --git a/datasets/mnist/partitioned/test/y/7.npy b/datasets/mnist/partitioned/test/y/7.npy
new file mode 100644
index 0000000..61a67b2
--- /dev/null
+++ b/datasets/mnist/partitioned/test/y/7.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:d535f9dce9161fd74c357de58abc7c4a1ce6364c553837a4351d56f97f26ca53
+size 1378
diff --git a/datasets/mnist/partitioned/train/x/0.npy b/datasets/mnist/partitioned/train/x/0.npy
new file mode 100644
index 0000000..f897477
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/0.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:76ab9b1dd7a661bea99a3b0dd91e81904b03855ea725e274fe8bd041780cf18f
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/1.npy b/datasets/mnist/partitioned/train/x/1.npy
new file mode 100644
index 0000000..26f3410
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/1.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:a4cbee1c3c137ba8fe6033d11a5f097eba3d581b14f813e57d5bb1a39be03b2c
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/2.npy b/datasets/mnist/partitioned/train/x/2.npy
new file mode 100644
index 0000000..a6a8225
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/2.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:507c0aa68fb2e56ef97b87f4d4033a51a05657980d1c721fd3247ba6ab78ecdd
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/3.npy b/datasets/mnist/partitioned/train/x/3.npy
new file mode 100644
index 0000000..603237a
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/3.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:9f31fb1045ed250ac6d0f1db02181821232ecf9fde11186ffdaffb41aa57f422
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/4.npy b/datasets/mnist/partitioned/train/x/4.npy
new file mode 100644
index 0000000..bdb707e
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/4.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:2368ef45033da7492e3b5c5819d534158ca7b776cf4388bc8250bdcefd64bff5
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/5.npy b/datasets/mnist/partitioned/train/x/5.npy
new file mode 100644
index 0000000..3a7a11b
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/5.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:208beab7854df575b67b40191720c52866893779ea145e13a6da3e38b8fe7352
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/6.npy b/datasets/mnist/partitioned/train/x/6.npy
new file mode 100644
index 0000000..f598c3e
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/6.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:5e5313f1e80e0270b61982dddecaf68dd06cef978b4314af165cdcc39970e164
+size 5880128
diff --git a/datasets/mnist/partitioned/train/x/7.npy b/datasets/mnist/partitioned/train/x/7.npy
new file mode 100644
index 0000000..3a7db3d
--- /dev/null
+++ b/datasets/mnist/partitioned/train/x/7.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:ffe736405b00cef238fd06ffae326aba880400a1cf2e953c8f5ef5543f4e7c06
+size 5880128
diff --git a/datasets/mnist/partitioned/train/y/0.npy b/datasets/mnist/partitioned/train/y/0.npy
new file mode 100644
index 0000000..1fe9885
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/0.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:581d31f1482fd9ae2ad93bf018a31e24dd0b87c5e8299fa062b5b955ffff7f5e
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/1.npy b/datasets/mnist/partitioned/train/y/1.npy
new file mode 100644
index 0000000..046dacf
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/1.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:a14441423517041fb8fe61312b60754cd0d4eeb8956ef1060a7f57780a768367
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/2.npy b/datasets/mnist/partitioned/train/y/2.npy
new file mode 100644
index 0000000..b257e23
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/2.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:a0234fa84a17b72bdd6ae29c87b4d2c1efccaec8f29e79adf9445a9cd008cd12
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/3.npy b/datasets/mnist/partitioned/train/y/3.npy
new file mode 100644
index 0000000..659e670
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/3.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:83c8af4ccb59b3a0343fd0f43db4b3b6796962fb871fa4a1e3470a7e506469c0
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/4.npy b/datasets/mnist/partitioned/train/y/4.npy
new file mode 100644
index 0000000..a5c22bd
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/4.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:810b5e113d4913a7414b38294c8508e5607e0ff4a22c1e6c3c6fa7221ac37e40
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/5.npy b/datasets/mnist/partitioned/train/y/5.npy
new file mode 100644
index 0000000..512d28d
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/5.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:64c9e94f8b223ad58e2b19341d2b2f2d69823703a2bb664ae8a55565157136fc
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/6.npy b/datasets/mnist/partitioned/train/y/6.npy
new file mode 100644
index 0000000..f7ad45e
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/6.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:9ff847487ba5695f4d39241efdb985aeb4e8e7cc70df5bac309bbe9f5025b292
+size 7628
diff --git a/datasets/mnist/partitioned/train/y/7.npy b/datasets/mnist/partitioned/train/y/7.npy
new file mode 100644
index 0000000..18a15d6
--- /dev/null
+++ b/datasets/mnist/partitioned/train/y/7.npy
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:c47569491fc4db006a4b173b2e4dc6f866390598342fa41b0b57e5b5e5f03ff0
+size 7628
diff --git a/horovod/keras/.run_mnist_data_dist b/horovod/keras/.run_mnist_data_dist
new file mode 100755
index 0000000..b9b19fa
--- /dev/null
+++ b/horovod/keras/.run_mnist_data_dist
@@ -0,0 +1,3 @@
+#!/usr/bin/env bash
+
+PYTHONHASHSEED=0 mpirun -np 1 python -u mnist_data_distributed.py
diff --git a/horovod/keras/mnist_data_distributed.py b/horovod/keras/mnist_data_distributed.py
new file mode 100644
index 0000000..c1632cd
--- /dev/null
+++ b/horovod/keras/mnist_data_distributed.py
@@ -0,0 +1,216 @@
+# Copyright (c) 2019 Forschungszentrum Juelich GmbH.
+# This code is licensed under MIT license (see the LICENSE file for details).
+
+"""
+    This program  program distributes the partitioned MNIST data across
+    multiple MPI ranks for truly data distributed training of a shallow ANN
+    for handwritten digit classification.
+
+    The Horovod framework is used for seamless distributed training. Instead
+    of distributing epochs, this program distributes data amongst the ranks,
+    so that each rank contributes training based on its local subset of the
+    training data.
+
+"""
+
+import os
+import sys
+
+import mpi4py
+import numpy as np
+import tensorflow as tf
+import horovod.tensorflow.keras as hvd
+from tensorflow.python.keras import backend as K
+
+from slns.errors import MpiInitError
+from slns.distribution import DataDistributor
+
+sys.path.insert(0, '../../utils')
+from data_utils import DataValidator
+
+
+def get_filenames(path):
+    """
+    Returns a list of names of files available on the given path.
+
+    :param path: str. Valid path to an existing directory.
+
+    :return: list. A list of filenames, where each filename is
+                   of type str.
+    """
+
+    absolute_path = os.path.join(os.path.abspath(f'{path}/x'))
+
+    return os.listdir(absolute_path)
+
+
+def get_concatenated_data(path, filenames):
+    """
+    Loads all files with the given filenames from the given path,
+    and concatenates all the loaded tensors into one large
+    tensor.
+
+    :param path: str. Valid path to an existing directory.
+    :param filenames: list. A list of filenames, where each filename is
+                   of type str.
+
+    :return: np.ndarray. A tensor with all the loaded content.
+    """
+
+    arrays = [
+        np.load(os.path.join(path, f)) for f in filenames
+    ]
+
+    return np.concatenate(arrays)
+
+
+def load_dataset(path, filenames):
+    """
+    Loads the input data and the corresponding labels as
+    two np.ndarray types, and returns these as a tuple.
+
+    :param path: str. Valid path to an existing directory.
+    :param filenames: list. A list of filenames, where each filename is
+                   of type str.
+
+    :return: Tuple consisting two np.ndarray types. The value at
+             the first tuple index is the input tensor, while the
+             other value is the corresponding array of labels.
+    """
+
+    x_dir = os.path.join(os.path.abspath(f'{path}/x'))
+    y_dir = os.path.join(os.path.abspath(f'{path}/y'))
+
+    x = get_concatenated_data(x_dir, filenames)
+    y = get_concatenated_data(y_dir, filenames)
+
+    return x, y
+
+
+def initialize_hvd_and_mpi():
+    """
+    Configure and initialize Horovod and MPI. Also, make sure there
+    are no conflicts between Horovod and mpi4py communicator
+    initialization.
+
+    :exception: hpcns.errors.MpiInitError is raised in the case
+                of initialization failure.
+    """
+
+    # Initialize Horovod.
+    hvd.init()
+
+    # Pin the GPU to be used to process local rank (one GPU per process)
+    tf_config = tf.ConfigProto()
+    tf_config.gpu_options.allow_growth = True
+    tf_config.gpu_options.visible_device_list = str(hvd.local_rank())
+    K.set_session(tf.Session(config=tf_config))
+
+    # Verify that MPI multi-threading is supported. Horovod cannot work
+    # with mpi4py (or any other MPI library) otherwise.
+    # More info on MPI multi-threading:
+    # https://www.mcs.anl.gov/research/projects/mpi/mpi-standard/mpi-report-2.0/node163.htm#Node163
+    if not hvd.mpi_threads_supported():
+        raise MpiInitError(
+            'MPI multi-threading is not supported. Horovod cannot work with mpi4py'
+            'in this case. Please enable MPI multi-threading and try again.'
+        )
+
+    # Disable automatic MPI initialization on importing mpi4py.MPI,
+    # as we are relying on Horovod to take care of the initialization.
+    mpi4py.rc.initialize = False
+
+    # Verify that Horovod and mpi4py are using the same number of ranks
+    from mpi4py import MPI
+    if hvd.size() != MPI.COMM_WORLD.Get_size():
+        raise MpiInitError(
+            'Mismatch in hvd.size() and MPI.COMM_WORLD size.'
+            f' No. of ranks in Horovod: {hvd.size()}.'
+            f' No. of ranks in mpi4py: {MPI.COMM_WORLD.Get_size()}'
+        )
+
+
+def main():
+    """ Orchestrates the distributed training program. """
+
+    # Configure and initialize Horovod and mpi4py
+    initialize_hvd_and_mpi()
+
+    # Flag to indicate whether this is the MPI root
+    is_root = hvd.rank() == 0
+
+    dist_decorator = DataDistributor(
+        mpi_comm=mpi4py.MPI.COMM_WORLD, shutdown_on_error=True
+    )
+    get_rank_local_filenames = dist_decorator(get_filenames)
+
+    data_sub_dir = 'mnist/partitioned'
+    data_dir = DataValidator.validated_data_dir(data_sub_dir)
+
+    train_filenames = get_rank_local_filenames(
+        f'{os.path.join(data_dir, data_sub_dir)}/train')
+    x_train, y_train = load_dataset(
+        f'{os.path.join(data_dir, data_sub_dir)}/train', train_filenames)
+
+    # Normalize input samples
+    x_train = x_train / 255.0
+
+    if is_root:
+        test_filenames = get_filenames(
+            f'{os.path.join(data_dir, data_sub_dir)}/test')
+        x_test, y_test = load_dataset(
+            f'{os.path.join(data_dir, data_sub_dir)}/test', test_filenames)
+        x_test = x_test / 255.0
+    else:
+        x_test, y_test = None, None
+
+    # Define the model, i.e., the network
+    model = tf.keras.models.Sequential([
+        tf.keras.layers.Flatten(),
+        tf.keras.layers.Dense(512, activation=tf.nn.relu),
+        tf.keras.layers.Dense(10, activation=tf.nn.softmax)
+    ])
+
+    # Optimizer
+    optimizer = tf.keras.optimizers.Adam()
+
+    # Horovod: add Horovod Distributed Optimizer.
+    optimizer = hvd.DistributedOptimizer(optimizer)
+
+    # Compile the model
+    model.compile(
+        optimizer=optimizer,
+        loss='sparse_categorical_crossentropy',
+        metrics=['accuracy']
+    )
+
+    # Fixed No. of epochs
+    epochs = 24
+
+    # Training callbacks
+    callbacks = [
+        # Horovod: broadcast initial variable states from rank 0 to all other processes.
+        # This is necessary to ensure consistent initialization of all workers when
+        # training is started with random weights or restored from a checkpoint.
+        hvd.callbacks.BroadcastGlobalVariablesCallback(0)
+    ]
+
+    # Train the model using the training set
+    model.fit(
+        x=x_train,
+        y=y_train,
+        batch_size=32,
+        epochs=epochs,
+        verbose=1 if is_root else 0,
+        callbacks=callbacks
+    )
+
+    if is_root:
+        # Test the model on the test set
+        score = model.evaluate(x=x_test, y=y_test, verbose=0)
+        print('Test loss:', score[0])
+        print('Test accuracy:', score[1])
+
+
+if __name__ == '__main__':
+    main()
-- 
GitLab