diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py index 3b124dd109aae19a109f88cbccddcafaa04d5d16..5ac312ac07499f37abc3555e2fdcd59a20a273f2 100644 --- a/pySDC/helpers/fieldsIO.py +++ b/pySDC/helpers/fieldsIO.py @@ -45,9 +45,7 @@ See :class:`pySDC.helpers.fieldsIO.writeFields_MPI` for an illustrative example. Warning ------- To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring). -Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not. - -> ⚠️ Also : this module can only be imported with **Python 3.11 or higher** ! +Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not. """ import os import numpy as np @@ -202,7 +200,7 @@ class FieldsIO: if not self.ALLOW_OVERWRITE: assert not os.path.isfile( self.fileName - ), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting" + ), f"file {self.fileName!r} already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting" with open(self.fileName, "w+b") as f: self.hBase.tofile(f) @@ -475,7 +473,7 @@ class Rectilinear(Scalar): Example ------- - >>> # Suppose the FieldsIO object is already writen into outputs.pysdc + >>> # Suppose the FieldsIO object is already written into outputs.pysdc >>> import os >>> from pySDC.utils.fieldsIO import Rectilinear >>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder @@ -494,12 +492,13 @@ class Rectilinear(Scalar): # MPI-parallel implementation # ------------------------------------------------------------------------- comm: MPI.Intracomm = None + _nCollectiveIO = None @classmethod def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): """ Setup the MPI mode for the files IO, considering a decomposition - of the 1D grid into contiuous subintervals. + of the 1D grid into contiguous subintervals. Parameters ---------- @@ -514,6 +513,20 @@ class Rectilinear(Scalar): cls.iLoc = iLoc cls.nLoc = nLoc cls.mpiFile = None + cls._nCollectiveIO = None + + @property + def nCollectiveIO(self): + """ + Number of collective IO operations over all processes, when reading or writing a field. + + Returns: + -------- + int: Number of collective IO accesses + """ + if self._nCollectiveIO is None: + self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX) + return self._nCollectiveIO @property def MPI_ON(self): @@ -541,7 +554,7 @@ class Rectilinear(Scalar): """Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position.""" self.mpiFile.Write(data) - def MPI_WRITE_AT(self, offset, data: np.ndarray): + def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray): """ Write data in the binary file in MPI mode, with a given offset **relative to the beginning of the file**. @@ -553,9 +566,9 @@ class Rectilinear(Scalar): data : np.ndarray Data to be written in the binary file. """ - self.mpiFile.Write_at(offset, data) + self.mpiFile.Write_at_all(offset, data) - def MPI_READ_AT(self, offset, data): + def MPI_READ_AT_ALL(self, offset, data: np.ndarray): """ Read data from the binary file in MPI mode, with a given offset **relative to the beginning of the file**. @@ -567,7 +580,7 @@ class Rectilinear(Scalar): data : np.ndarray Array on which to read the data from the binary file. """ - self.mpiFile.Read_at(offset, data) + self.mpiFile.Read_at_all(offset, data) def MPI_FILE_CLOSE(self): """Close the binary file in MPI mode""" @@ -620,13 +633,22 @@ class Rectilinear(Scalar): offset0 = self.fileSize self.MPI_FILE_OPEN(mode="a") + nWrites = 0 + nCollectiveIO = self.nCollectiveIO + if self.MPI_ROOT: self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) offset0 += self.tSize for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_WRITE_AT(offset, field[iVar, *iBeg]) + self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)]) + nWrites += 1 + + for _ in range(nCollectiveIO - nWrites): + # Additional collective write to catch up with other processes + self.MPI_WRITE_AT_ALL(offset0, field[:0]) + self.MPI_FILE_CLOSE() def iPos(self, iVar, iX): @@ -669,9 +691,18 @@ class Rectilinear(Scalar): field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype) self.MPI_FILE_OPEN(mode="r") + nReads = 0 + nCollectiveIO = self.nCollectiveIO + for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_READ_AT(offset, field[iVar, *iBeg]) + self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)]) + nReads += 1 + + for _ in range(nCollectiveIO - nReads): + # Additional collective read to catch up with other processes + self.MPI_READ_AT_ALL(offset0, field[:0]) + self.MPI_FILE_CLOSE() return t, field @@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes): dim = len(gridSizes) coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes] s = [None] * dim - u0 = np.array(np.arange(nVar) + 1)[:, *s] + u0 = np.array(np.arange(nVar) + 1)[(slice(None), *s)] for x in np.meshgrid(*coords, indexing="ij"): u0 = u0 * x return coords, u0 @@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes): iLoc, nLoc = blocks.localBounds Rectilinear.setupMPI(comm, iLoc, nLoc) s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)] - u0 = u0[:, *s] - print(MPI_RANK, u0.shape) + u0 = u0[(slice(None), *s)] f1 = Rectilinear(DTYPES[dtypeIdx], fileName) f1.setHeader(nVar=nVar, coords=coords) @@ -726,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes): def compareFields_MPI(fileName, u0, nSteps): from pySDC.helpers.fieldsIO import FieldsIO + comm = MPI.COMM_WORLD + MPI_RANK = comm.Get_rank() + if MPI_RANK == 0: + print("Comparing fields with MPI") + f2 = FieldsIO.fromFile(fileName) times = np.arange(nSteps) / nSteps diff --git a/pySDC/tests/test_helpers/test_fieldsIO.py b/pySDC/tests/test_helpers/test_fieldsIO.py index 1e75505d0faab3f8c062a251466fe1a2b378c4b4..2878be5fe71f6994c45c30ed9e1598e170232a40 100644 --- a/pySDC/tests/test_helpers/test_fieldsIO.py +++ b/pySDC/tests/test_helpers/test_fieldsIO.py @@ -3,9 +3,6 @@ import sys import glob import pytest -if sys.version_info < (3, 11): - pytest.skip("skipping fieldsIO tests on python lower than 3.11", allow_module_level=True) - import itertools import numpy as np @@ -14,6 +11,7 @@ from pySDC.helpers.fieldsIO import DTYPES, FieldsIO FieldsIO.ALLOW_OVERWRITE = True +@pytest.mark.base @pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("dim", range(4)) def testHeader(dim, dtypeIdx): @@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx): assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}" +@pytest.mark.base @pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("nSteps", [1, 2, 10, 100]) @pytest.mark.parametrize("nVar", [1, 2, 5]) @@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx): assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" +@pytest.mark.base @pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) @pytest.mark.parametrize("nSteps", [1, 2, 5, 10]) @pytest.mark.parametrize("nVar", [1, 2, 5]) @@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx): assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" +@pytest.mark.base @pytest.mark.parametrize("nSteps", [1, 10]) @pytest.mark.parametrize("nZ", [1, 5, 16]) @pytest.mark.parametrize("nY", [1, 5, 16]) @@ -249,8 +250,7 @@ if __name__ == "__main__": parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions") args = parser.parse_args() - if sys.version_info >= (3, 11): - from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI + from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI - u0 = writeFields_MPI(**args.__dict__) - compareFields_MPI(args.fileName, u0, args.nSteps) + u0 = writeFields_MPI(**args.__dict__) + compareFields_MPI(args.fileName, u0, args.nSteps)