Skip to content
Snippets Groups Projects
Unverified Commit 4116ae3e authored by Thomas Baumann's avatar Thomas Baumann Committed by GitHub
Browse files

Performance fix for FieldsIO (#538)


* Performance fixes for FieldsIO

* Added missing test markers

* Reverting to non-collective IO if needed

* Doing as much collective IO as possible also with unbalanced
distributions

* Small cleanup

* TL: debug the full collective approach

---------

Co-authored-by: default avatarThibaut Lunet <thibaut.lunet@tuhh.de>
parent f4f4df91
No related branches found
No related tags found
No related merge requests found
Pipeline #263348 passed
...@@ -45,9 +45,7 @@ See :class:`pySDC.helpers.fieldsIO.writeFields_MPI` for an illustrative example. ...@@ -45,9 +45,7 @@ See :class:`pySDC.helpers.fieldsIO.writeFields_MPI` for an illustrative example.
Warning Warning
------- -------
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring). To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not. Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
> ⚠️ Also : this module can only be imported with **Python 3.11 or higher** !
""" """
import os import os
import numpy as np import numpy as np
...@@ -202,7 +200,7 @@ class FieldsIO: ...@@ -202,7 +200,7 @@ class FieldsIO:
if not self.ALLOW_OVERWRITE: if not self.ALLOW_OVERWRITE:
assert not os.path.isfile( assert not os.path.isfile(
self.fileName self.fileName
), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting" ), f"file {self.fileName!r} already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
with open(self.fileName, "w+b") as f: with open(self.fileName, "w+b") as f:
self.hBase.tofile(f) self.hBase.tofile(f)
...@@ -475,7 +473,7 @@ class Rectilinear(Scalar): ...@@ -475,7 +473,7 @@ class Rectilinear(Scalar):
Example Example
------- -------
>>> # Suppose the FieldsIO object is already writen into outputs.pysdc >>> # Suppose the FieldsIO object is already written into outputs.pysdc
>>> import os >>> import os
>>> from pySDC.utils.fieldsIO import Rectilinear >>> from pySDC.utils.fieldsIO import Rectilinear
>>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder >>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder
...@@ -494,12 +492,13 @@ class Rectilinear(Scalar): ...@@ -494,12 +492,13 @@ class Rectilinear(Scalar):
# MPI-parallel implementation # MPI-parallel implementation
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
comm: MPI.Intracomm = None comm: MPI.Intracomm = None
_nCollectiveIO = None
@classmethod @classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
""" """
Setup the MPI mode for the files IO, considering a decomposition Setup the MPI mode for the files IO, considering a decomposition
of the 1D grid into contiuous subintervals. of the 1D grid into contiguous subintervals.
Parameters Parameters
---------- ----------
...@@ -514,6 +513,20 @@ class Rectilinear(Scalar): ...@@ -514,6 +513,20 @@ class Rectilinear(Scalar):
cls.iLoc = iLoc cls.iLoc = iLoc
cls.nLoc = nLoc cls.nLoc = nLoc
cls.mpiFile = None cls.mpiFile = None
cls._nCollectiveIO = None
@property
def nCollectiveIO(self):
"""
Number of collective IO operations over all processes, when reading or writing a field.
Returns:
--------
int: Number of collective IO accesses
"""
if self._nCollectiveIO is None:
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
return self._nCollectiveIO
@property @property
def MPI_ON(self): def MPI_ON(self):
...@@ -541,7 +554,7 @@ class Rectilinear(Scalar): ...@@ -541,7 +554,7 @@ class Rectilinear(Scalar):
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position.""" """Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
self.mpiFile.Write(data) self.mpiFile.Write(data)
def MPI_WRITE_AT(self, offset, data: np.ndarray): def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
""" """
Write data in the binary file in MPI mode, with a given offset Write data in the binary file in MPI mode, with a given offset
**relative to the beginning of the file**. **relative to the beginning of the file**.
...@@ -553,9 +566,9 @@ class Rectilinear(Scalar): ...@@ -553,9 +566,9 @@ class Rectilinear(Scalar):
data : np.ndarray data : np.ndarray
Data to be written in the binary file. Data to be written in the binary file.
""" """
self.mpiFile.Write_at(offset, data) self.mpiFile.Write_at_all(offset, data)
def MPI_READ_AT(self, offset, data): def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
""" """
Read data from the binary file in MPI mode, with a given offset Read data from the binary file in MPI mode, with a given offset
**relative to the beginning of the file**. **relative to the beginning of the file**.
...@@ -567,7 +580,7 @@ class Rectilinear(Scalar): ...@@ -567,7 +580,7 @@ class Rectilinear(Scalar):
data : np.ndarray data : np.ndarray
Array on which to read the data from the binary file. Array on which to read the data from the binary file.
""" """
self.mpiFile.Read_at(offset, data) self.mpiFile.Read_at_all(offset, data)
def MPI_FILE_CLOSE(self): def MPI_FILE_CLOSE(self):
"""Close the binary file in MPI mode""" """Close the binary file in MPI mode"""
...@@ -620,13 +633,22 @@ class Rectilinear(Scalar): ...@@ -620,13 +633,22 @@ class Rectilinear(Scalar):
offset0 = self.fileSize offset0 = self.fileSize
self.MPI_FILE_OPEN(mode="a") self.MPI_FILE_OPEN(mode="a")
nWrites = 0
nCollectiveIO = self.nCollectiveIO
if self.MPI_ROOT: if self.MPI_ROOT:
self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize offset0 += self.tSize
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_WRITE_AT(offset, field[iVar, *iBeg]) self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
nWrites += 1
for _ in range(nCollectiveIO - nWrites):
# Additional collective write to catch up with other processes
self.MPI_WRITE_AT_ALL(offset0, field[:0])
self.MPI_FILE_CLOSE() self.MPI_FILE_CLOSE()
def iPos(self, iVar, iX): def iPos(self, iVar, iX):
...@@ -669,9 +691,18 @@ class Rectilinear(Scalar): ...@@ -669,9 +691,18 @@ class Rectilinear(Scalar):
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype) field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)
self.MPI_FILE_OPEN(mode="r") self.MPI_FILE_OPEN(mode="r")
nReads = 0
nCollectiveIO = self.nCollectiveIO
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_READ_AT(offset, field[iVar, *iBeg]) self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
nReads += 1
for _ in range(nCollectiveIO - nReads):
# Additional collective read to catch up with other processes
self.MPI_READ_AT_ALL(offset0, field[:0])
self.MPI_FILE_CLOSE() self.MPI_FILE_CLOSE()
return t, field return t, field
...@@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes): ...@@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes):
dim = len(gridSizes) dim = len(gridSizes)
coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes] coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes]
s = [None] * dim s = [None] * dim
u0 = np.array(np.arange(nVar) + 1)[:, *s] u0 = np.array(np.arange(nVar) + 1)[(slice(None), *s)]
for x in np.meshgrid(*coords, indexing="ij"): for x in np.meshgrid(*coords, indexing="ij"):
u0 = u0 * x u0 = u0 * x
return coords, u0 return coords, u0
...@@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes): ...@@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
iLoc, nLoc = blocks.localBounds iLoc, nLoc = blocks.localBounds
Rectilinear.setupMPI(comm, iLoc, nLoc) Rectilinear.setupMPI(comm, iLoc, nLoc)
s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)] s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)]
u0 = u0[:, *s] u0 = u0[(slice(None), *s)]
print(MPI_RANK, u0.shape)
f1 = Rectilinear(DTYPES[dtypeIdx], fileName) f1 = Rectilinear(DTYPES[dtypeIdx], fileName)
f1.setHeader(nVar=nVar, coords=coords) f1.setHeader(nVar=nVar, coords=coords)
...@@ -726,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes): ...@@ -726,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
def compareFields_MPI(fileName, u0, nSteps): def compareFields_MPI(fileName, u0, nSteps):
from pySDC.helpers.fieldsIO import FieldsIO from pySDC.helpers.fieldsIO import FieldsIO
comm = MPI.COMM_WORLD
MPI_RANK = comm.Get_rank()
if MPI_RANK == 0:
print("Comparing fields with MPI")
f2 = FieldsIO.fromFile(fileName) f2 = FieldsIO.fromFile(fileName)
times = np.arange(nSteps) / nSteps times = np.arange(nSteps) / nSteps
......
...@@ -3,9 +3,6 @@ import sys ...@@ -3,9 +3,6 @@ import sys
import glob import glob
import pytest import pytest
if sys.version_info < (3, 11):
pytest.skip("skipping fieldsIO tests on python lower than 3.11", allow_module_level=True)
import itertools import itertools
import numpy as np import numpy as np
...@@ -14,6 +11,7 @@ from pySDC.helpers.fieldsIO import DTYPES, FieldsIO ...@@ -14,6 +11,7 @@ from pySDC.helpers.fieldsIO import DTYPES, FieldsIO
FieldsIO.ALLOW_OVERWRITE = True FieldsIO.ALLOW_OVERWRITE = True
@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("dim", range(4)) @pytest.mark.parametrize("dim", range(4))
def testHeader(dim, dtypeIdx): def testHeader(dim, dtypeIdx):
...@@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx): ...@@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx):
assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}" assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}"
@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 10, 100]) @pytest.mark.parametrize("nSteps", [1, 2, 10, 100])
@pytest.mark.parametrize("nVar", [1, 2, 5]) @pytest.mark.parametrize("nVar", [1, 2, 5])
...@@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx): ...@@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"
@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 5, 10]) @pytest.mark.parametrize("nSteps", [1, 2, 5, 10])
@pytest.mark.parametrize("nVar", [1, 2, 5]) @pytest.mark.parametrize("nVar", [1, 2, 5])
...@@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx): ...@@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"
@pytest.mark.base
@pytest.mark.parametrize("nSteps", [1, 10]) @pytest.mark.parametrize("nSteps", [1, 10])
@pytest.mark.parametrize("nZ", [1, 5, 16]) @pytest.mark.parametrize("nZ", [1, 5, 16])
@pytest.mark.parametrize("nY", [1, 5, 16]) @pytest.mark.parametrize("nY", [1, 5, 16])
...@@ -249,7 +250,6 @@ if __name__ == "__main__": ...@@ -249,7 +250,6 @@ if __name__ == "__main__":
parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions") parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions")
args = parser.parse_args() args = parser.parse_args()
if sys.version_info >= (3, 11):
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI
u0 = writeFields_MPI(**args.__dict__) u0 = writeFields_MPI(**args.__dict__)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment