diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py index fe0e8cbd0ec88e2fe9683cd31fe42bd7b97b3bbd..58e00d3cb4c170174cba81d9826c1e642dba8df9 100644 --- a/pySDC/helpers/fieldsIO.py +++ b/pySDC/helpers/fieldsIO.py @@ -44,14 +44,13 @@ See :class:`pySDC.helpers.fieldsIO.writeFields_MPI` for an illustrative example. Warning ------- -To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring). +To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.setupMPI` (cf their docstring). Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not. """ import os import numpy as np from typing import Type, TypeVar import logging -import itertools T = TypeVar("T") @@ -61,11 +60,17 @@ try: except ImportError: pass from mpi4py import MPI + from mpi4py.util.dtlib import from_numpy_dtype as MPI_DTYPE except ImportError: class MPI: COMM_WORLD = None Intracomm = T + File = T + Datatype = T + + def MPI_DTYPE(): + pass # Supported data types @@ -412,6 +417,8 @@ class Rectilinear(Scalar): coords = self.setupCoords(*coords) self.header = {"nVar": int(nVar), "coords": coords} self.nItems = nVar * self.nDoF + if self.MPI_ON: + self.MPI_SETUP() @property def hInfos(self): @@ -433,6 +440,8 @@ class Rectilinear(Scalar): gridSizes = np.fromfile(f, dtype=np.int32, count=dim) coords = [np.fromfile(f, dtype=np.float64, count=n) for n in gridSizes] self.setHeader(nVar, coords) + if self.MPI_ON: + self.MPI_SETUP() def reshape(self, fields: np.ndarray): """Reshape the fields to a N-d array (inplace operation)""" @@ -493,7 +502,6 @@ class Rectilinear(Scalar): # MPI-parallel implementation # ------------------------------------------------------------------------- comm: MPI.Intracomm = None - _nCollectiveIO = None @classmethod def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): @@ -513,21 +521,9 @@ class Rectilinear(Scalar): cls.comm = comm cls.iLoc = iLoc cls.nLoc = nLoc - cls.mpiFile = None - cls._nCollectiveIO = None - - @property - def nCollectiveIO(self): - """ - Number of collective IO operations over all processes, when reading or writing a field. - - Returns: - -------- - int: Number of collective IO accesses - """ - if self._nCollectiveIO is None: - self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX) - return self._nCollectiveIO + cls.mpiFile: MPI.File = None + cls.mpiType: MPI.Datatype = None + cls.mpiFileType: MPI.Datatype = None @property def MPI_ON(self): @@ -543,6 +539,16 @@ class Rectilinear(Scalar): return True return self.comm.Get_rank() == 0 + def MPI_SETUP(self): + """Setup subarray masks for each processes""" + self.mpiType = MPI_DTYPE(self.dtype) + self.mpiFileType = self.mpiType.Create_subarray( + [self.nVar, *self.gridSizes], # Global array sizes + [self.nVar, *self.nLoc], # Local array sizes + [0, *self.iLoc], # Global starting indices of local blocks + ) + self.mpiFileType.Commit() + def MPI_FILE_OPEN(self, mode): """Open the binary file in MPI mode""" amode = { @@ -567,7 +573,8 @@ class Rectilinear(Scalar): data : np.ndarray Data to be written in the binary file. """ - self.mpiFile.Write_at_all(offset, data) + self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType) + self.mpiFile.Write_all(data) def MPI_READ_AT_ALL(self, offset, data: np.ndarray): """ @@ -581,7 +588,8 @@ class Rectilinear(Scalar): data : np.ndarray Array on which to read the data from the binary file. """ - self.mpiFile.Read_at_all(offset, data) + self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType) + self.mpiFile.Read_all(data) def MPI_FILE_CLOSE(self): """Close the binary file in MPI mode""" @@ -632,33 +640,15 @@ class Rectilinear(Scalar): *self.nLoc, ), f"expected {(self.nVar, *self.nLoc)} shape, got {field.shape}" - offset0 = self.fileSize + offset = self.fileSize self.MPI_FILE_OPEN(mode="a") - nWrites = 0 - nCollectiveIO = self.nCollectiveIO if self.MPI_ROOT: self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) - offset0 += self.tSize - - for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): - offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)]) - nWrites += 1 - - for _ in range(nCollectiveIO - nWrites): - # Additional collective write to catch up with other processes - self.MPI_WRITE_AT_ALL(offset0, field[:0]) - + offset += self.tSize + self.MPI_WRITE_AT_ALL(offset, field) self.MPI_FILE_CLOSE() - def iPos(self, iVar, iX): - iPos = iVar * self.nDoF - for axis in range(self.dim - 1): - iPos += (self.iLoc[axis] + iX[axis]) * np.prod(self.gridSizes[axis + 1 :]) - iPos += self.iLoc[-1] - return iPos - def readField(self, idx): """ Read one field stored in the binary file, corresponding to the given @@ -684,26 +674,15 @@ class Rectilinear(Scalar): return super().readField(idx) idx = self.formatIndex(idx) - offset0 = self.hSize + idx * (self.tSize + self.fSize) + offset = self.hSize + idx * (self.tSize + self.fSize) with open(self.fileName, "rb") as f: - t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0]) - offset0 += self.tSize + t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0]) + offset += self.tSize field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype) self.MPI_FILE_OPEN(mode="r") - nReads = 0 - nCollectiveIO = self.nCollectiveIO - - for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): - offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)]) - nReads += 1 - - for _ in range(nCollectiveIO - nReads): - # Additional collective read to catch up with other processes - self.MPI_READ_AT_ALL(offset0, field[:0]) - + self.MPI_READ_AT_ALL(offset, field) self.MPI_FILE_CLOSE() return t, field diff --git a/pySDC/tests/test_benchmarks/test_collocation.py b/pySDC/tests/test_benchmarks/test_collocation.py index 14cd361d296e728a5e23bb4404b0feadb7552035..2d6963e374978ac0480d4f801063cf845989fff8 100644 --- a/pySDC/tests/test_benchmarks/test_collocation.py +++ b/pySDC/tests/test_benchmarks/test_collocation.py @@ -3,8 +3,8 @@ import numpy as np from pySDC.core.collocation import CollBase -t_start = float(np.random.rand(1) * 0.2) -t_end = float(0.8 + np.random.rand(1) * 0.2) +t_start = float(np.random.rand(1)[0] * 0.2) +t_end = float(0.8 + np.random.rand(1)[0] * 0.2) tolQuad = 1e-13