diff --git a/.github/workflows/ci_pipeline.yml b/.github/workflows/ci_pipeline.yml index dee29be4f420dfa0b540cc8ac792f57086f0eb5a..4ba3557084da3aae8adf9876d31f5e1af0713690 100644 --- a/.github/workflows/ci_pipeline.yml +++ b/.github/workflows/ci_pipeline.yml @@ -55,6 +55,9 @@ jobs: - name: Install additional packages as needed run: | micromamba install -y --file etc/environment-tests.yml --freeze-installed + - name: Install pySDC as a package in the current environment + run: | + pip install --no-deps -e . - name: Run pytest for CPU stuff run: | echo "print('Loading sitecustomize.py...') diff --git a/.gitignore b/.gitignore index 0f586897a1074cbdae01e27beee0c4dc3c06a40b..845782e782aa642c793b5dd212e503eda6c7142e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ step_*.png *.swp *_data.json !_dataRef.json +*.pysdc # Created by https://www.gitignore.io diff --git a/pySDC/helpers/blocks.py b/pySDC/helpers/blocks.py new file mode 100644 index 0000000000000000000000000000000000000000..80482e712e402fd99c2f3a769e63e1178bb20014 --- /dev/null +++ b/pySDC/helpers/blocks.py @@ -0,0 +1,138 @@ +class BlockDecomposition(object): + """ + Class decomposing a cartesian space domain (1D to 3D) into a given number of processors. + + Parameters + ---------- + nProcs : int + Total number of processors for space block decomposition. + gridSizes : list[int] + Number of grid points in each dimension + algo : str, optional + Algorithm used for the block decomposition : + + - Hybrid : approach minimizing interface communication, inspired from + the `[Hybrid CFD solver] <https://web.stanford.edu/group/ctr/ResBriefs07/5_larsson1_pp47_58.pdf>`_. + - ChatGPT : quickly generated using `[ChatGPT] <https://chatgpt.com>`_. + The default is "Hybrid". + gRank : int, optional + If provided, the global rank that will determine the local block distribution. Default is None. + order : str, optional + The order used when computing the rank block distribution. Default is `C`. + """ + + def __init__(self, nProcs, gridSizes, algo="Hybrid", gRank=None, order="C"): + dim = len(gridSizes) + assert dim in [1, 2, 3], "block decomposition only works for 1D, 2D or 3D domains" + + if algo == "ChatGPT": + + nBlocks = [1] * dim + for i in range(2, int(nProcs**0.5) + 1): + while nProcs % i == 0: + nBlocks[0] *= i + nProcs //= i + nBlocks.sort() + + if nProcs > 1: + nBlocks[0] *= nProcs + + nBlocks.sort() + while len(nBlocks) < dim: + smallest = nBlocks.pop(0) + nBlocks += [1, smallest] + nBlocks.sort() + + while len(nBlocks) > dim: + smallest = nBlocks.pop(0) + next_smallest = nBlocks.pop(0) + nBlocks.append(smallest * next_smallest) + nBlocks.sort() + + elif algo == "Hybrid": + rest = nProcs + facs = { + 1: [1], + 2: [2, 1], + 3: [2, 3, 1], + }[dim] + exps = [0] * dim + for n in range(dim - 1): + while (rest % facs[n]) == 0: + exps[n] = exps[n] + 1 + rest = rest // facs[n] + if rest > 1: + facs[dim - 1] = rest + exps[dim - 1] = 1 + + nBlocks = [1] * dim + for n in range(dim - 1, -1, -1): + while exps[n] > 0: + dummymax = -1 + dmax = 0 + for d, nPts in enumerate(gridSizes): + dummy = (nPts + nBlocks[d] - 1) // nBlocks[d] + if dummy >= dummymax: + dummymax = dummy + dmax = d + nBlocks[dmax] = nBlocks[dmax] * facs[n] + exps[n] = exps[n] - 1 + + else: + raise NotImplementedError(f"algo={algo}") + + # Store attributes + self.dim = dim + self.nBlocks = nBlocks + self.gridSizes = gridSizes + + # Used for rank block distribution + self.gRank = gRank + self.order = order + + @property + def ranks(self): + gRank, order = self.gRank, self.order + assert gRank is not None, "gRank attribute need to be set" + dim, nBlocks = self.dim, self.nBlocks + if dim == 1: + return (gRank,) + elif dim == 2: + div = nBlocks[-1] if order == "C" else nBlocks[0] + return (gRank // div, gRank % div) + else: + raise NotImplementedError(f"dim={dim}") + + @property + def localBounds(self): + iLocList, nLocList = [], [] + for rank, nPoints, nBlocks in zip(self.ranks, self.gridSizes, self.nBlocks): + n0 = nPoints // nBlocks + nRest = nPoints - nBlocks * n0 + nLoc = n0 + 1 * (rank < nRest) + iLoc = rank * n0 + nRest * (rank >= nRest) + rank * (rank < nRest) + + iLocList.append(iLoc) + nLocList.append(nLoc) + return iLocList, nLocList + + +if __name__ == "__main__": + # Base usage of this module for a 2D decomposition + from mpi4py import MPI + from time import sleep + + comm: MPI.Intracomm = MPI.COMM_WORLD + MPI_SIZE = comm.Get_size() + MPI_RANK = comm.Get_rank() + + blocks = BlockDecomposition(MPI_SIZE, [256, 64], gRank=MPI_RANK) + if MPI_RANK == 0: + print(f"nBlocks : {blocks.nBlocks}") + + ranks = blocks.ranks + bounds = blocks.localBounds + + comm.Barrier() + sleep(0.01 * MPI_RANK) + print(f"[Rank {MPI_RANK}] pRankX={ranks}, bounds={bounds}") diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py new file mode 100644 index 0000000000000000000000000000000000000000..4507a8bdbccc223f2c0fe8c927418a8b2758b778 --- /dev/null +++ b/pySDC/helpers/fieldsIO.py @@ -0,0 +1,753 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Generic utility class to write and read cartesian grid field solutions into binary files. +It implements the base file handler class :class:`FieldsIO`, that is specialized into : + +- :class:`Scal0D` : for 0D fields (scalar) with a given number of variables +- :class:`Cart1D` : for 1D fields with a given number of variables +- :class:`Cart2D` : for 2D fields on a cartesian grid with a given number of variables + +While each file handler need to be setup with specific parameters (grid, ...), +each written file can be read using the same interface implemented in the +base abstract class. + +Example +------- +>>> import numpy as np +>>> from pySDC.helpers.fieldsIO import Cart1D, FieldsIO +>>> +>>> # Write some fields in files +>>> x = np.linspace(0, 1, 101) +>>> fOut = Cart2D(np.float64, "file.pysdc") +>>> fOut.setHeader(nVar=2, coordX=x) +>>> fOut.initialize() +>>> times = [0, 1, 2] +>>> u0 = np.array([-1, 1])[:, None]*x[None, :] +>>> for t in times: +>>> fOut.addField(t, t*u0) +>>> +>>> # Read the file using a the generic interface +>>> fIn = FieldsIO.fromFile("file.pysdc") +>>> times = fIn.times +>>> assert len(times) == fIn.nFields +>>> tEnd, uEnd = fIn.readField(-1) +>>> assert tEnd == times[-1] + +Notes +----- +🚀 :class:`Cart1D` and :class:`Cart2D` are compatible with a MPI-based cartesian decomposition. +See :class:`pySDC.tests.test_helpers.test_fieldsIO.writeFields_MPI` for an illustrative example. + +Warning +------- +To use MPI collective writing, you need to call first the class methods :class:`Cart1D.initMPI` +or :class:`Cart2D.initMPI` from the associated class (cf their docstring). +Also, their associated `setHeader` methods **must be given the global grids coordinates**, +wether code is run in parallel or not. +""" +import os +import numpy as np +from typing import Type, TypeVar + +T = TypeVar("T") + +try: + try: + import dolfin as df # noqa: F841 (for some reason, dolfin always needs to be imported before mpi4py) + except ImportError: + pass + from mpi4py import MPI +except ImportError: + + class MPI: + COMM_WORLD = None + Intracomm = T + + +# Supported data types +DTYPES = { + 0: np.float64, # double precision + 1: np.complex128, + 2: np.float128, # quadruple precision + 3: np.complex256, + 4: np.float32, # single precision + 5: np.complex64, +} +DTYPES_AVAIL = {val: key for key, val in DTYPES.items()} + +# Header dtype +H_DTYPE = np.int8 +T_DTYPE = np.float64 + + +class FieldsIO: + """Abstract IO file handler""" + + STRUCTS = {} # supported structures, modified dynamically + sID = None # structure ID of the FieldsIO class, modified dynamically + + tSize = T_DTYPE().itemsize + + ALLOW_OVERWRITE = False + + def __init__(self, dtype, fileName): + """ + Parameters + ---------- + dtype : np.dtype + The data type of the fields values. + fileName : str + File. + """ + assert dtype in DTYPES_AVAIL, f"{dtype=} not available" + self.dtype = dtype + self.fileName = fileName + self.initialized = False + + # Initialized by the setHeader abstract method + self.header = None + self.nItems = None # number of values (dof) stored into one field + + def __str__(self): + return f"FieldsIO[{self.__class__.__name__}|{self.dtype.__name__}|file:{self.fileName}]<{hex(id(self))}>" + + def __repr__(self): + return self.__str__() + + @classmethod + def fromFile(cls, fileName): + """ + Read a file storing fields, and return the `FieldsIO` of the appropriate + field type (structure). + + Parameters + ---------- + fileName : str + Name of the binary file. + + Returns + ------- + fieldsIO : :class:`FieldsIO` + The specialized `FieldsIO` adapted to the file. + """ + assert os.path.isfile(fileName), f"not a file ({fileName})" + with open(fileName, "rb") as f: + STRUCT, DTYPE = np.fromfile(f, dtype=H_DTYPE, count=2) + fieldsIO: FieldsIO = cls.STRUCTS[STRUCT](DTYPES[DTYPE], fileName) + fieldsIO.readHeader(f) + fieldsIO.initialized = True + return fieldsIO + + @property + def hBase(self) -> np.ndarray: + """Base header into numpy array format""" + return np.array([self.sID, DTYPES_AVAIL[self.dtype]], dtype=H_DTYPE) + + @classmethod + def register(cls, sID): + """ + Decorator used to register a new class FieldsIO specialized class + + Parameters + ---------- + sID : int + Unique identifyer for the file, used in the binary file. + Since it's encoded on a 8-bytes signed integer, + it must be between -128 and 127 + + Example + ------- + >>> # New specialized FieldsIO class + >>> @FieldsIO.register(sID=31) + >>> class HexaMesh2D(FieldsIO): + >>> pass # ... implementation + """ + + def wrapper(registered: Type[T]) -> Type[T]: + assert ( + sID not in cls.STRUCTS + ), f"struct ID already taken by {cls.STRUCTS[sID]}, cannot use it for {registered}" + cls.STRUCTS[sID] = registered + registered.sID = sID + return registered + + return wrapper + + def initialize(self): + """Initialize the file handler : create the file with header, removing any existing file with the same name""" + assert self.header is not None, "header must be set before initializing FieldsIO" + assert not self.initialized, "FieldsIO already initialized" + + if not self.ALLOW_OVERWRITE: + assert not os.path.isfile( + self.fileName + ), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting" + + with open(self.fileName, "w+b") as f: + self.hBase.tofile(f) + for array in self.hInfos: + array.tofile(f) + self.initialized = True + + def setHeader(self, **params): + """(Abstract) Set the header before creating a new file to store the fields""" + raise NotImplementedError() + + @property + def hInfos(self) -> list[np.ndarray]: + """(Abstract) Array representing the grid structure to be written in the binary file.""" + raise NotImplementedError() + + def readHeader(self, f): + """ + (Abstract) Read the header from the file storing the fields. + + Parameters + ---------- + f : `_io.TextIOWrapper` + File to read the header from. + """ + raise NotImplementedError() + + @property + def hSize(self): + """Size of the full header (in bytes)""" + return self.hBase.nbytes + sum(hInfo.nbytes for hInfo in self.hInfos) + + @property + def itemSize(self): + """Size of one field value (in bytes)""" + return self.dtype().itemsize + + @property + def fSize(self): + """Full size of a field (in bytes)""" + return self.nItems * self.itemSize + + @property + def fileSize(self): + """Current size of the file (in bytes)""" + return os.path.getsize(self.fileName) + + def addField(self, time, field): + """ + Append one field solution at the end of the file with one given time. + + Parameters + ---------- + time : float-like + The associated time of the field solution. + field : np.ndarray + The field values. + """ + assert self.initialized, "cannot add field to a non initialized FieldsIO" + field = np.asarray(field) + assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}" + assert field.size == self.nItems, f"expected {self.nItems} values, got {field.size}" + with open(self.fileName, "ab") as f: + np.array(time, dtype=T_DTYPE).tofile(f) + field.tofile(f) + + @property + def nFields(self): + """Number of fields currently stored in the binary file""" + return int((self.fileSize - self.hSize) // (self.tSize + self.fSize)) + + def formatIndex(self, idx): + """Utility method to format a fields index to a positional integer (negative starts from last field index, like python lists)""" + nFields = self.nFields + if idx < 0: + idx = nFields + idx + assert idx < nFields, f"cannot read index {idx} from {nFields} fields" + assert idx >= 0, f"cannot read index {idx-nFields} from {nFields} fields" + return idx + + @property + def times(self): + """Vector of all times stored in the binary file""" + times = [] + with open(self.fileName, "rb") as f: + f.seek(self.hSize) + for i in range(self.nFields): + t = np.fromfile(f, dtype=T_DTYPE, count=1, offset=0 if i == 0 else self.fSize)[0] + times.append(float(t)) + return times + + def time(self, idx): + """Time stored at a given field index""" + idx = self.formatIndex(idx) + offset = self.hSize + idx * (self.tSize + self.fSize) + with open(self.fileName, "rb") as f: + t = np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0] + return float(t) + + def readField(self, idx): + """ + Read one field stored in the binary file, corresponding to the given + time index. + + Parameters + ---------- + idx : int + Positional index of the field. + + Returns + ------- + t : float + Stored time for this field. + field : np.ndarray + Read fields in a numpy array. + """ + idx = self.formatIndex(idx) + offset = self.hSize + idx * (self.tSize + self.fSize) + with open(self.fileName, "rb") as f: + f.seek(offset) + t = float(np.fromfile(f, dtype=T_DTYPE, count=1)[0]) + field = np.fromfile(f, dtype=self.dtype, count=self.nItems) + self.reshape(field) + return t, field + + def reshape(self, field): + """Eventually reshape the field to correspond to the grid structure""" + pass + + +@FieldsIO.register(sID=0) +class Scal0D(FieldsIO): + """FieldsIO handler storing a given number of scalar""" + + # ------------------------------------------------------------------------- + # Overriden methods + # ------------------------------------------------------------------------- + def setHeader(self, nVar): + """ + Set the descriptive grid structure to be stored in the file header. + + Parameters + ---------- + nVar : int + Number of scalar variable stored. + """ + self.header = {"nVar": int(nVar)} + self.nItems = self.nVar + + @property + def hInfos(self): + """Array representing the grid structure to be written in the binary file.""" + return [np.array([self.nVar], dtype=np.int64)] + + def readHeader(self, f): + """ + Read the header from the binary file storing the fields. + + Parameters + ---------- + f : `_io.TextIOWrapper` + File to read the header from. + """ + (nVar,) = np.fromfile(f, dtype=np.int64, count=1) + self.setHeader(nVar) + + # ------------------------------------------------------------------------- + # Class specifics + # ------------------------------------------------------------------------- + @property + def nVar(self): + """Number of variables in a fields, as described in the header""" + return self.header["nVar"] + + +@FieldsIO.register(sID=1) +class Cart1D(Scal0D): + """FieldsIO handler storing a given number of 2D scalar variables""" + + # ------------------------------------------------------------------------- + # Overriden methods + # ------------------------------------------------------------------------- + def setHeader(self, nVar, coordX): + """ + Set the descriptive grid structure to be stored in the file header. + + Parameters + ---------- + nVar : int + Number of 1D variables stored. + coordX : np.1darray + The grid coordinates in X direction. + + Note + ---- + When used in MPI decomposition mode, `coordX` **must** be the global grid. + """ + coords = self.setupCoords(coordX=coordX) + self.header = {"nVar": int(nVar), **coords} + self.nItems = nVar * self.nX + + @property + def hInfos(self): + """Array representing the grid structure to be written in the binary file.""" + return [np.array([self.nVar, self.nX], dtype=np.int64), np.array(self.header["coordX"], dtype=np.float64)] + + def readHeader(self, f): + """ + Read the header from the binary file storing the fields. + + Parameters + ---------- + f : `_io.TextIOWrapper` + File to read the header from. + """ + nVar, nX = np.fromfile(f, dtype=np.int64, count=2) + coordX = np.fromfile(f, dtype=np.float64, count=nX) + self.setHeader(nVar, coordX) + + def reshape(self, fields: np.ndarray): + fields.shape = (self.nVar, self.nX) + + # ------------------------------------------------------------------------- + # Class specifics + # ------------------------------------------------------------------------- + @property + def nX(self): + """Number of points in x direction""" + return self.header["coordX"].size + + @staticmethod + def setupCoords(**coords): + """Utility function to setup grids in multuple dimensions, given the keyword arguments""" + coords = {name: np.asarray(coord, dtype=np.float64) for name, coord in coords.items()} + for name, coord in coords.items(): + assert coord.ndim == 1, f"{name} must be one dimensional" + return coords + + # ------------------------------------------------------------------------- + # MPI-parallel implementation + # ------------------------------------------------------------------------- + comm: MPI.Intracomm = None + + @classmethod + def setupMPI(cls, comm: MPI.Intracomm, iLocX, nLocX): + """ + Setup the MPI mode for the files IO, considering a decomposition + of the 1D grid into contiuous subintervals. + + Parameters + ---------- + comm : MPI.Intracomm + The space decomposition communicator. + iLocX : int + Starting index of the local sub-domain in the global `coordX`. + nLocX : int + Number of points in the local sub-domain. + """ + cls.comm = comm + cls.iLocX = iLocX + cls.nLocX = nLocX + cls.mpiFile = None + + @property + def MPI_ON(self): + """Wether or not MPI is activated""" + if self.comm is None: + return False + return self.comm.Get_size() > 1 + + @property + def MPI_ROOT(self): + """Wether or not the process is MPI Root""" + if self.comm is None: + return True + return self.comm.Get_rank() == 0 + + def MPI_FILE_OPEN(self, mode): + """Open the binary file in MPI mode""" + amode = { + "r": MPI.MODE_RDONLY, + "a": MPI.MODE_WRONLY | MPI.MODE_APPEND, + }[mode] + self.mpiFile = MPI.File.Open(self.comm, self.fileName, amode) + + def MPI_WRITE(self, data): + """Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position.""" + self.mpiFile.Write(data) + + def MPI_WRITE_AT(self, offset, data: np.ndarray): + """ + Write data in the binary file in MPI mode, with a given offset + **relative to the beginning of the file**. + + Parameters + ---------- + offset : int + Offset to write at, relative to the beginning of the file, in bytes. + data : np.ndarray + Data to be written in the binary file. + """ + self.mpiFile.Write_at(offset, data) + + def MPI_READ_AT(self, offset, data): + """ + Read data from the binary file in MPI mode, with a given offset + **relative to the beginning of the file**. + + Parameters + ---------- + offset : int + Offset to read at, relative to the beginning of the file, in bytes. + data : np.ndarray + Array on which to read the data from the binary file. + """ + self.mpiFile.Read_at(offset, data) + + def MPI_FILE_CLOSE(self): + """Close the binary file in MPI mode""" + self.mpiFile.Close() + self.mpiFile = None + + def initialize(self): + """Initialize the binary file (write header) in MPI mode""" + if self.MPI_ROOT: + try: + super().initialize() + except AssertionError as e: + if self.MPI_ON: + print(f"{type(e)}: {e}") + self.comm.Abort() + else: + raise e + + if self.MPI_ON: + self.comm.Barrier() # Important, should not be removed ! + self.initialized = True + + def addField(self, time, field): + """ + Append one field solution at the end of the file with one given time, + possibly using MPI. + + Parameters + ---------- + time : float-like + The associated time of the field solution. + field : np.ndarray + The (local) field values. + + Note + ---- + If a MPI decomposition is used, field **must be** the local field values. + """ + if not self.MPI_ON: + return super().addField(time, field) + + assert self.initialized, "cannot add field to a non initialized FieldsIO" + + field = np.asarray(field) + assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}" + assert field.shape == (self.nVar, self.nLocX), f"expected {(self.nVar, self.nLocX)} shape, got {field.shape}" + + offset0 = self.fileSize + self.MPI_FILE_OPEN(mode="a") + if self.MPI_ROOT: + self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) + offset0 += self.tSize + + for iVar in range(self.nVar): + offset = offset0 + (iVar * self.nX + self.iLocX) * self.itemSize + self.MPI_WRITE_AT(offset, field[iVar]) + self.MPI_FILE_CLOSE() + + def readField(self, idx): + """ + Read one field stored in the binary file, corresponding to the given + time index, possibly in MPI mode. + + Parameters + ---------- + idx : int + Positional index of the field. + + Returns + ------- + t : float + Stored time for this field. + field : np.ndarray + Read (local) fields in a numpy array. + + Note + ---- + If a MPI decomposition is used, it reads and returns the local fields values only. + """ + if not self.MPI_ON: + return super().readField(idx) + idx = self.formatIndex(idx) + + offset0 = self.hSize + idx * (self.fSize + self.tSize) + with open(self.fileName, "rb") as f: + t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0]) + offset0 += self.tSize + + field = np.empty((self.nVar, self.nLocX), dtype=self.dtype) + + self.MPI_FILE_OPEN(mode="r") + for iVar in range(self.nVar): + offset = offset0 + (iVar * self.nX + self.iLocX) * self.itemSize + self.MPI_READ_AT(offset, field[iVar]) + self.MPI_FILE_CLOSE() + + return t, field + + +@FieldsIO.register(sID=2) +class Cart2D(Cart1D): + """FieldsIO handler storing a given number of 2D scalar variables""" + + # ------------------------------------------------------------------------- + # Overriden methods + # ------------------------------------------------------------------------- + def setHeader(self, nVar, coordX, coordY): + """ + Set the descriptive grid structure to be stored in the file header. + + Parameters + ---------- + nVar : int + Number of 1D variables stored. + coordX : np.1darray + The grid coordinates in x direction. + coordY : np.1darray + The grid coordinates in y direction. + + Note + ---- + When used in MPI decomposition mode, `coordX` and `coordX` **must** be the global grid. + """ + coords = self.setupCoords(coordX=coordX, coordY=coordY) + self.header = {"nVar": int(nVar), **coords} + self.nItems = nVar * self.nX * self.nY + + @property + def hInfos(self): + """Array representing the grid structure to be written in the binary file.""" + return [ + np.array([self.nVar, self.nX, self.nY], dtype=np.int64), + np.array(self.header["coordX"], dtype=np.float64), + np.array(self.header["coordY"], dtype=np.float64), + ] + + def readHeader(self, f): + """ + Read the header from the binary file storing the fields. + + Parameters + ---------- + f : `_io.TextIOWrapper` + File to read the header from. + """ + nVar, nX, nY = np.fromfile(f, dtype=np.int64, count=3) + coordX = np.fromfile(f, dtype=np.float64, count=nX) + coordY = np.fromfile(f, dtype=np.float64, count=nY) + self.setHeader(nVar, coordX, coordY) + + def reshape(self, fields: np.ndarray): + """Reshape the fields to a [nVar, nX, nY] array (inplace operation)""" + fields.shape = (self.nVar, self.nX, self.nY) + + # ------------------------------------------------------------------------- + # Class specifics + # ------------------------------------------------------------------------- + @property + def nY(self): + """Number of points in y direction""" + return self.header["coordY"].size + + # ------------------------------------------------------------------------- + # MPI-parallel implementation + # ------------------------------------------------------------------------- + @classmethod + def setupMPI(cls, comm: MPI.Intracomm, iLocX, nLocX, iLocY, nLocY): + super().setupMPI(comm, iLocX, nLocX) + cls.iLocY = iLocY + cls.nLocY = nLocY + + def addField(self, time, field): + """ + Append one field solution at the end of the file with one given time, + possibly using MPI. + + Parameters + ---------- + time : float-like + The associated time of the field solution. + field : np.ndarray + The (local) field values. + + Note + ---- + If a MPI decomposition is used, field **must be** the local field values. + """ + if not self.MPI_ON: + return super().addField(time, field) + + assert self.initialized, "cannot add field to a non initialized FieldsIO" + + field = np.asarray(field) + assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}" + assert field.shape == ( + self.nVar, + self.nLocX, + self.nLocY, + ), f"expected {(self.nVar, self.nLocX, self.nLocY)} shape, got {field.shape}" + + offset0 = self.fileSize + self.MPI_FILE_OPEN(mode="a") + if self.MPI_ROOT: + self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) + offset0 += self.tSize + + for iVar in range(self.nVar): + for iX in range(self.nLocX): + offset = offset0 + (iVar * self.nX * self.nY + (self.iLocX + iX) * self.nY + self.iLocY) * self.itemSize + self.MPI_WRITE_AT(offset, field[iVar, iX]) + self.MPI_FILE_CLOSE() + + def readField(self, idx): + """ + Read one field stored in the binary file, corresponding to the given + time index, eventually in MPI mode. + + Parameters + ---------- + idx : int + Positional index of the field. + + Returns + ------- + t : float + Stored time for this field. + field : np.ndarray + Read (local) fields in a numpy array. + + Note + ---- + If a MPI decomposition is used, it reads and returns the local fields values only. + """ + if not self.MPI_ON: + return super().readField(idx) + idx = self.formatIndex(idx) + + offset0 = self.hSize + idx * (self.tSize + self.fSize) + with open(self.fileName, "rb") as f: + t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0]) + offset0 += self.tSize + + field = np.empty((self.nVar, self.nLocX, self.nLocY), dtype=self.dtype) + + self.MPI_FILE_OPEN(mode="r") + for iVar in range(self.nVar): + for iX in range(self.nLocX): + offset = offset0 + (iVar * self.nX * self.nY + (self.iLocX + iX) * self.nY + self.iLocY) * self.itemSize + self.MPI_READ_AT(offset, field[iVar, iX]) + self.MPI_FILE_CLOSE() + + return t, field diff --git a/pySDC/tests/test_helpers/test_fieldsIO.py b/pySDC/tests/test_helpers/test_fieldsIO.py new file mode 100644 index 0000000000000000000000000000000000000000..5d5be0151c01f30651471da29dadc960be320d41 --- /dev/null +++ b/pySDC/tests/test_helpers/test_fieldsIO.py @@ -0,0 +1,374 @@ +import pytest +import numpy as np + +from pySDC.helpers.fieldsIO import DTYPES, FieldsIO + +FieldsIO.ALLOW_OVERWRITE = True + + +@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) +@pytest.mark.parametrize("nDim", range(3)) +def testHeader(nDim, dtypeIdx): + from pySDC.helpers.fieldsIO import FieldsIO, Scal0D, Cart1D, Cart2D + + fileName = "testHeader.pysdc" + dtype = DTYPES[dtypeIdx] + + coordX = np.linspace(0, 1, num=256, endpoint=False) + coordY = np.linspace(0, 1, num=64, endpoint=False) + + if nDim == 0: + Class = Scal0D + args = {"nVar": 20} + elif nDim == 1: + Class = Cart1D + args = {"nVar": 10, "coordX": coordX} + elif nDim == 2: + Class = Cart2D + args = {"nVar": 10, "coordX": coordX, "coordY": coordY} + + f1 = Class(dtype, fileName) + assert f1.__str__() == f1.__repr__(), "__repr__ and __str__ do not return the same result" + try: + f1.initialize() + except AssertionError: + pass + else: + raise AssertionError(f"{f1} should not be initialized without AssertionError before header is set") + + f1.setHeader(**args) + assert f1.header is not None, f"{f1} has still None for header after setHeader" + assert f1.nItems is not None, f"{f1} has still None for nItems after setHeader" + assert f1.nItems > 0, f"{f1} has nItems={f1.nItems} after setHeader" + try: + f1.addField(0, np.zeros(f1.nItems, dtype=f1.dtype)) + except AssertionError: + pass + else: + raise AssertionError(f"{f1} should not be initialized without error before header is set") + + f1.initialize() + assert f1.initialized, f"{f1} is not initialized after calling initialize()" + assert f1.fileSize == f1.hSize, f"{f1} has file size different than its header size" + + f2 = FieldsIO.fromFile(fileName) + assert f2.initialized, f"f2 ({f2}) not initialized after instantiating from file" + assert type(f2) == type(f1), f"f2 ({f2}) not of the same type as f1 ({f1})" + assert f2.dtype == f1.dtype, f"f2 ({f2}) has not the same dtype as f1 ({f1})" + + for key, val in f1.header.items(): + assert key in f2.header, f"could not read {key} key in written {f2}" + assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}" + + +@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) +@pytest.mark.parametrize("nSteps", [1, 2, 10, 100]) +@pytest.mark.parametrize("nVar", [1, 2, 5]) +def testScal0D(nVar, nSteps, dtypeIdx): + from pySDC.helpers.fieldsIO import FieldsIO, Scal0D + + fileName = "testScal0D.pysdc" + dtype = DTYPES[dtypeIdx] + + f1 = Scal0D(dtype, fileName) + f1.setHeader(nVar=nVar) + + assert f1.nItems == nVar, f"{f1} do not have nItems == nVar" + f1.initialize() + + u0 = np.random.rand(nVar).astype(f1.dtype) + times = np.arange(nSteps) / nSteps + + for t in times: + ut = (u0 * t).astype(f1.dtype) + f1.addField(t, ut) + + assert f1.nFields == nSteps, f"{f1} do not have nFields == nSteps" + assert np.allclose(f1.times, times), f"{f1} has wrong times stored in file" + + f2 = FieldsIO.fromFile(fileName) + + assert f1.nFields == f2.nFields, f"f2 ({f2}) has different nFields than f1 ({f1})" + assert f1.times == f2.times, f"f2 ({f2}) has different times than f1 ({f1})" + assert (f1.time(-1) == f2.times[-1]) and ( + f1.times[-1] == f2.time(-1) + ), f"f2 ({f2}) has different last time than f1 ({f1})" + + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f1} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f1} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" + + +@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) +@pytest.mark.parametrize("nSteps", [1, 2, 5, 10]) +@pytest.mark.parametrize("nX", [5, 10, 16, 32, 64]) +@pytest.mark.parametrize("nVar", [1, 2, 5]) +def testCart1D(nVar, nX, nSteps, dtypeIdx): + from pySDC.helpers.fieldsIO import FieldsIO, Cart1D, DTYPES + + fileName = "testCart1D.pysdc" + dtype = DTYPES[dtypeIdx] + + coordX = np.linspace(0, 1, num=nX, endpoint=False) + nX = coordX.size + + f1 = Cart1D(dtype, fileName) + f1.setHeader(nVar=nVar, coordX=coordX) + + assert f1.nItems == nVar * nX, f"{f1} do not have nItems == nVar*nX" + assert f1.nX == nX, f"{f1} has incorrect nX" + f1.initialize() + + u0 = np.random.rand(nVar, nX).astype(f1.dtype) + times = np.arange(nSteps) / nSteps + + for t in times: + ut = (u0 * t).astype(f1.dtype) + f1.addField(t, ut) + + assert f1.nFields == nSteps, f"{f1} do not have nFields == nSteps" + assert np.allclose(f1.times, times), f"{f1} has wrong times stored in file" + + f2 = FieldsIO.fromFile(fileName) + + assert f1.nFields == f2.nFields, f"f2 ({f2}) has different nFields than f1 ({f1})" + assert f1.times == f2.times, f"f2 ({f2}) has different times than f1 ({f1})" + assert (f1.time(-1) == f2.times[-1]) and ( + f1.times[-1] == f2.time(-1) + ), f"f2 ({f2}) has different last time than f1 ({f1})" + + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f1} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f1} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" + + +@pytest.mark.parametrize("dtypeIdx", DTYPES.keys()) +@pytest.mark.parametrize("nSteps", [1, 2, 5, 10]) +@pytest.mark.parametrize("nY", [5, 10, 16]) +@pytest.mark.parametrize("nX", [5, 10, 16]) +@pytest.mark.parametrize("nVar", [1, 2, 5]) +def testCart2D(nVar, nX, nY, nSteps, dtypeIdx): + from pySDC.helpers.fieldsIO import FieldsIO, Cart2D, DTYPES + + fileName = "testCart2D.pysdc" + dtype = DTYPES[dtypeIdx] + + coordX = np.linspace(0, 1, num=nX, endpoint=False) + coordY = np.linspace(0, 1, num=nY, endpoint=False) + + f1 = Cart2D(dtype, fileName) + f1.setHeader(nVar=nVar, coordX=coordX, coordY=coordY) + + assert f1.nItems == nVar * nX * nY, f"{f1} do not have nItems == nVar*nX" + assert f1.nX == nX, f"{f1} has incorrect nX" + assert f1.nY == nY, f"{f1} has incorrect nY" + f1.initialize() + + u0 = np.random.rand(nVar, nX, nY).astype(f1.dtype) + times = np.arange(nSteps) / nSteps + + for t in times: + ut = (u0 * t).astype(f1.dtype) + f1.addField(t, ut) + + assert f1.nFields == nSteps, f"{f1} do not have nFields == nSteps" + assert np.allclose(f1.times, times), f"{f1} has wrong times stored in file" + + f2 = FieldsIO.fromFile(fileName) + + assert f1.nFields == f2.nFields, f"f2 ({f2}) has different nFields than f1 ({f1})" + assert f1.times == f2.times, f"f2 ({f2}) has different times than f1 ({f1})" + assert (f1.time(-1) == f2.times[-1]) and ( + f1.times[-1] == f2.time(-1) + ), f"f2 ({f2}) has different last time than f1 ({f1})" + + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f1} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f1} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values" + + +def initGrid(nVar, nX, nY=None): + nDim = 1 + if nY is not None: + nDim += 1 + x = np.linspace(0, 1, num=nX, endpoint=False) + coords = (x,) + gridSizes = (nX,) + u0 = np.array(np.arange(nVar) + 1)[:, None] * x[None, :] + + if nDim > 1: + y = np.linspace(0, 1, num=nY, endpoint=False) + coords += (y,) + gridSizes += (nY,) + u0 = u0[:, :, None] * y[None, None, :] + + return coords, gridSizes, u0 + + +def writeFields_MPI(fileName, nDim, dtypeIdx, algo, nSteps, nVar, nX, nY=None): + coords, gridSizes, u0 = initGrid(nVar, nX, nY) + + from mpi4py import MPI + from pySDC.helpers.blocks import BlockDecomposition + from pySDC.helpers.fieldsIO import Cart1D, Cart2D + + comm = MPI.COMM_WORLD + MPI_SIZE = comm.Get_size() + MPI_RANK = comm.Get_rank() + + blocks = BlockDecomposition(MPI_SIZE, gridSizes, algo, MPI_RANK) + + if nDim == 1: + (iLocX,), (nLocX,) = blocks.localBounds + (pRankX,) = blocks.ranks + Cart1D.setupMPI(comm, iLocX, nLocX) + u0 = u0[:, iLocX : iLocX + nLocX] + + f1 = Cart1D(DTYPES[dtypeIdx], fileName) + f1.setHeader(nVar=nVar, coordX=coords[0]) + + if nDim == 2: + (iLocX, iLocY), (nLocX, nLocY) = blocks.localBounds + Cart2D.setupMPI(comm, iLocX, nLocX, iLocY, nLocY) + u0 = u0[:, iLocX : iLocX + nLocX, iLocY : iLocY + nLocY] + + f1 = Cart2D(DTYPES[dtypeIdx], fileName) + f1.setHeader(nVar=nVar, coordX=coords[0], coordY=coords[1]) + + u0 = np.asarray(u0, dtype=f1.dtype) + f1.initialize() + + times = np.arange(nSteps) / nSteps + for t in times: + ut = (u0 * t).astype(f1.dtype) + f1.addField(t, ut) + + return u0 + + +def compareFields_MPI(fileName, u0, nSteps): + from pySDC.helpers.fieldsIO import FieldsIO + + f2 = FieldsIO.fromFile(fileName) + + times = np.arange(nSteps) / nSteps + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f2} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f2} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f2} has incorrect values" + + +@pytest.mark.mpi4py +@pytest.mark.parametrize("nX", [61, 16, 32]) +@pytest.mark.parametrize("nVar", [1, 4]) +@pytest.mark.parametrize("nSteps", [1, 10]) +@pytest.mark.parametrize("algo", ["ChatGPT", "Hybrid"]) +@pytest.mark.parametrize("dtypeIdx", [0, 1]) +@pytest.mark.parametrize("nProcs", [1, 2, 4]) +def testCart1D_MPI(nProcs, dtypeIdx, algo, nSteps, nVar, nX): + + import subprocess + + fileName = "testCart1D_MPI.pysdc" + + cmd = f"mpirun -np {nProcs} python {__file__} --fileName {fileName} --nDim 1 " + cmd += f"--dtypeIdx {dtypeIdx} --algo {algo} --nSteps {nSteps} --nVar {nVar} --nX {nX}" + + p = subprocess.Popen(cmd.split(), cwd=".") + p.wait() + assert p.returncode == 0, f"MPI write with {nProcs} did not return code 0, but {p.returncode}" + + from pySDC.helpers.fieldsIO import FieldsIO, Cart1D + + f2: Cart1D = FieldsIO.fromFile(fileName) + + assert type(f2) == Cart1D, f"incorrect type in MPI written fields {f2}" + assert f2.nFields == nSteps, f"incorrect nFields in MPI written fields {f2}" + assert f2.nVar == nVar, f"incorrect nVar in MPI written fields {f2}" + assert f2.nX == nX, f"incorrect nX in MPI written fields {f2}" + + coords, _, u0 = initGrid(nVar, nX) + assert np.allclose(f2.header['coordX'], coords[0]), f"incorrect coordX in MPI written fields {f2}" + + times = np.arange(nSteps) / nSteps + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f2} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f2} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f2} has incorrect values" + + +@pytest.mark.mpi4py +@pytest.mark.parametrize("nY", [61, 16, 32]) +@pytest.mark.parametrize("nX", [61, 16, 32]) +@pytest.mark.parametrize("nVar", [1, 4]) +@pytest.mark.parametrize("nSteps", [1, 10]) +@pytest.mark.parametrize("algo", ["ChatGPT", "Hybrid"]) +@pytest.mark.parametrize("dtypeIdx", [0, 1]) +@pytest.mark.parametrize("nProcs", [1, 2, 4]) +def testCart2D_MPI(nProcs, dtypeIdx, algo, nSteps, nVar, nX, nY): + + import subprocess + + fileName = "testCart2D_MPI.pysdc" + + cmd = f"mpirun -np {nProcs} python {__file__} --fileName {fileName} --nDim 2 " + cmd += f"--dtypeIdx {dtypeIdx} --algo {algo} --nSteps {nSteps} --nVar {nVar} --nX {nX} --nY {nY}" + + p = subprocess.Popen(cmd.split(), cwd=".") + p.wait() + assert p.returncode == 0, f"MPI write with {nProcs} did not return code 0, but {p.returncode}" + + from pySDC.helpers.fieldsIO import FieldsIO, Cart2D + + f2: Cart2D = FieldsIO.fromFile(fileName) + + assert type(f2) == Cart2D, f"incorrect type in MPI written fields {f2}" + assert f2.nFields == nSteps, f"incorrect nFields in MPI written fields {f2}" + assert f2.nVar == nVar, f"incorrect nVar in MPI written fields {f2}" + assert f2.nX == nX, f"incorrect nX in MPI written fields {f2}" + assert f2.nY == nY, f"incorrect nY in MPI written fields {f2}" + + grids, _, u0 = initGrid(nVar, nX, nY) + assert np.allclose(f2.header['coordX'], grids[0]), f"incorrect coordX in MPI written fields {f2}" + assert np.allclose(f2.header['coordY'], grids[1]), f"incorrect coordY in MPI written fields {f2}" + + times = np.arange(nSteps) / nSteps + for idx, t in enumerate(times): + u1 = u0 * t + t2, u2 = f2.readField(idx) + assert t2 == t, f"{idx}'s fields in {f2} has incorrect time" + assert u2.shape == u1.shape, f"{idx}'s fields in {f2} has incorrect shape" + assert np.allclose(u2, u1), f"{idx}'s fields in {f2} has incorrect values" + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--fileName', type=str, help='fileName of the file') + parser.add_argument('--nDim', type=int, help='space dimension', choices=[1, 2]) + parser.add_argument('--dtypeIdx', type=int, help="dtype index", choices=DTYPES.keys()) + parser.add_argument( + '--algo', type=str, help="algorithm used for block decomposition", choices=["ChatGPT", "Hybrid"] + ) + parser.add_argument('--nSteps', type=int, help="number of field variables") + parser.add_argument('--nVar', type=int, help="number of field variables") + parser.add_argument('--nX', type=int, help="number of grid points in x dimension") + parser.add_argument('--nY', type=int, help="number of grid points in y dimension") + args = parser.parse_args() + + u0 = writeFields_MPI(**args.__dict__) + compareFields_MPI(args.fileName, u0, args.nSteps)