diff --git a/pySDC/helpers/firedrake_ensemble_communicator.py b/pySDC/helpers/firedrake_ensemble_communicator.py index b10e79d027b0563869381f256e54dda4d1d244bb..b08d1dc5453f94189cc5a0d0b70cfa71b83067e4 100644 --- a/pySDC/helpers/firedrake_ensemble_communicator.py +++ b/pySDC/helpers/firedrake_ensemble_communicator.py @@ -21,6 +21,10 @@ class FiredrakeEnsembleCommunicator: ensemble (firedrake.Ensemble): Ensemble communicator """ self.ensemble = fd.Ensemble(comm, space_size) + self.comm_wold = comm + + def Split(self, *args, **kwargs): + return FiredrakeEnsembleCommunicator(self.comm_wold.Split(*args, **kwargs), space_size=self.space_comm.size) @property def space_comm(self): @@ -53,6 +57,16 @@ class FiredrakeEnsembleCommunicator: else: self.ensemble.bcast(buf, root=root) + def Irecv(self, buf, source, tag=MPI.ANY_TAG): + if type(buf) in [np.ndarray, list]: + return self.ensemble.ensemble_comm.Irecv(buf=buf, source=source, tag=tag) + return self.ensemble.irecv(buf, source, tag=tag)[0] + + def Isend(self, buf, dest, tag=MPI.ANY_TAG): + if type(buf) in [np.ndarray, list]: + return self.ensemble.ensemble_comm.Isend(buf=buf, dest=dest, tag=tag) + return self.ensemble.isend(buf, dest, tag=tag)[0] + def get_ensemble(comm, space_size): return fd.Ensemble(comm, space_size) diff --git a/pySDC/helpers/pySDC_as_gusto_time_discretization.py b/pySDC/helpers/pySDC_as_gusto_time_discretization.py index 80c686e69602bcb1f8c3e101562e479a20d2f78a..b7f7c111191a0b7b3c92c7b9023e91751186347e 100644 --- a/pySDC/helpers/pySDC_as_gusto_time_discretization.py +++ b/pySDC/helpers/pySDC_as_gusto_time_discretization.py @@ -4,10 +4,14 @@ from gusto.time_discretisation.time_discretisation import TimeDiscretisation, wr from gusto.core.labels import explicit from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI +from pySDC.implementations.controller_classes.controller_MPI import controller_MPI from pySDC.implementations.problem_classes.GenericGusto import GenericGusto, GenericGustoImex from pySDC.core.hooks import Hooks from pySDC.helpers.stats_helper import get_sorted +import logging +import numpy as np + class LogTime(Hooks): """ @@ -34,6 +38,10 @@ class pySDC_integrator(TimeDiscretisation): It will construct a pySDC controller which can be used by itself and will be used within the time step when called from Gusto. Access the controller via `pySDC_integrator.controller`. This class also has `pySDC_integrator.stats`, which gathers all of the pySDC stats recorded in the hooks during every time step when used within Gusto. + + This class supports subcycling with multi-step SDC. You can use pseudo-parallelism by simply giving `n_steps` > 1 or + do proper parallelism by giving a `controller_communicator` of kind `pySDC.FiredrakeEnsembleCommunicator` with the + appropriate size. You also have to toggle between pseudo and proper parallelism with `useMPIController`. """ def __init__( @@ -44,8 +52,10 @@ class pySDC_integrator(TimeDiscretisation): field_name=None, solver_parameters=None, options=None, - t0=0, imex=False, + useMPIController=False, + n_steps=1, + controller_communicator=None, ): """ Initialization @@ -63,6 +73,10 @@ class pySDC_integrator(TimeDiscretisation): options to either be passed to the spatial discretisation, or to control the "wrapper" methods, such as Embedded DG or a recovery method. Defaults to None. + imex (bool): Whether to use IMEX splitting + useMPIController (bool): Whether to use the pseudo-parallel or proper parallel pySDC controller + n_steps (int): Number of steps done in parallel when using pseudo-parallel pySDC controller + controller_communicator (pySDC.FiredrakeEnsembleCommunicator, optional): Communicator for the proper parallel controller """ self._residual = None @@ -79,6 +93,23 @@ class pySDC_integrator(TimeDiscretisation): self.timestepper = None self.dt_next = None self.imex = imex + self.useMPIController = useMPIController + self.controller_communicator = controller_communicator + + if useMPIController: + assert ( + type(self.controller_communicator).__name__ == 'FiredrakeEnsembleCommunicator' + ), f'Need to give a FiredrakeEnsembleCommunicator here, not {type(self.controller_communicator)}' + if n_steps > 1: + logging.getLogger(type(self).__name__).warning( + f'Warning: You selected {n_steps=}, which will be ignored when using the MPI controller!' + ) + assert ( + controller_communicator is not None + ), 'You need to supply a communicator when using the MPI controller!' + self.n_steps = controller_communicator.size + else: + self.n_steps = n_steps def setup(self, equation, apply_bcs=True, *active_labels): super().setup(equation, apply_bcs, *active_labels) @@ -96,7 +127,7 @@ class pySDC_integrator(TimeDiscretisation): 'residual': self._residual, **self.description['problem_params'], } - self.description['level_params']['dt'] = float(self.domain.dt) + self.description['level_params']['dt'] = float(self.domain.dt) / self.n_steps # add utility hook required for step size adaptivity hook_class = self.controller_params.get('hook_class', []) @@ -106,7 +137,17 @@ class pySDC_integrator(TimeDiscretisation): self.controller_params['hook_class'] = hook_class # prepare controller and variables - self.controller = controller_nonMPI(1, description=self.description, controller_params=self.controller_params) + if self.useMPIController: + self.controller = controller_MPI( + comm=self.controller_communicator, + description=self.description, + controller_params=self.controller_params, + ) + else: + self.controller = controller_nonMPI( + self.n_steps, description=self.description, controller_params=self.controller_params + ) + self.prob = self.level.prob self.sweeper = self.level.sweep self.x0_pySDC = self.prob.dtype_u(self.prob.init) @@ -125,14 +166,26 @@ class pySDC_integrator(TimeDiscretisation): def residual(self, value): """Make sure the pySDC problem residual and this residual are the same""" if hasattr(self, 'prob'): - self.prob.residual = value + if self.useMPIController: + self.controller.S.levels[0].prob.residual = value + else: + for S in self.controller.MS: + S.levels[0].prob.residual = value else: self._residual = value + @property + def step(self): + """Get the first step on the controller""" + if self.useMPIController: + return self.controller.S + else: + return self.controller.MS[0] + @property def level(self): """Get the finest pySDC level""" - return self.controller.MS[0].levels[0] + return self.step.levels[0] @wrapper_apply def apply(self, x_out, x_in): @@ -144,29 +197,31 @@ class pySDC_integrator(TimeDiscretisation): x_in (:class:`Function`): the input field. """ self.x0_pySDC.functionspace.assign(x_in) - assert self.level.params.dt == float(self.dt), 'Step sizes have diverged between pySDC and Gusto' + assert np.isclose( + self.level.params.dt * self.n_steps, float(self.dt) + ), 'Step sizes have diverged between pySDC and Gusto' if self.dt_next is not None: assert ( self.timestepper is not None ), 'You need to set self.timestepper to the timestepper in order to facilitate adaptive step size selection here!' - self.timestepper.dt = fd.Constant(self.dt_next) + self.timestepper.dt = fd.Constant(self.dt_next * self.n_steps) self.t = self.timestepper.t uend, _stats = self.controller.run(u0=self.x0_pySDC, t0=float(self.t), Tend=float(self.t + self.dt)) # update time variables - if self.level.params.dt != float(self.dt): + if not np.isclose(self.level.params.dt * self.n_steps, float(self.dt)): self.dt_next = self.level.params.dt - self.t = get_sorted(_stats, type='_time', recomputed=False)[-1][1] + self.t = get_sorted(_stats, type='_time', recomputed=False, comm=self.controller_communicator)[-1][1] # update time of the Gusto stepper. # After this step, the Gusto stepper updates its time again to arrive at the correct time if self.timestepper is not None: self.timestepper.t = fd.Constant(self.t - self.dt) - self.dt = self.level.params.dt + self.dt = fd.Constant(self.level.params.dt * self.n_steps) # update stats and output self.stats = {**self.stats, **_stats} diff --git a/pySDC/implementations/datatype_classes/firedrake_mesh.py b/pySDC/implementations/datatype_classes/firedrake_mesh.py index 783b01c2e150cdad400bf87b137e622ae3e0d048..07e0000f7c37cf637f4712c42c26404998d7dee0 100644 --- a/pySDC/implementations/datatype_classes/firedrake_mesh.py +++ b/pySDC/implementations/datatype_classes/firedrake_mesh.py @@ -1,6 +1,7 @@ import firedrake as fd from pySDC.core.errors import DataError +from pySDC.helpers.firedrake_ensemble_communicator import FiredrakeEnsembleCommunicator class firedrake_mesh(object): @@ -77,6 +78,57 @@ class firedrake_mesh(object): return fd.norm(self.functionspace, 'L2') + def isend(self, dest=None, tag=None, comm=None): + """ + Routine for sending data forward in time (non-blocking) + + Args: + dest (int): target rank + tag (int): communication tag + comm: communicator + + Returns: + request handle + """ + assert ( + type(comm) == FiredrakeEnsembleCommunicator + ), f'Need to give a FiredrakeEnsembleCommunicator here, not {type(comm)}' + return comm.Isend(self.functionspace, dest=dest, tag=tag) + + def irecv(self, source=None, tag=None, comm=None): + """ + Routine for receiving in time + + Args: + source (int): source rank + tag (int): communication tag + comm: communicator + + Returns: + None + """ + assert ( + type(comm) == FiredrakeEnsembleCommunicator + ), f'Need to give a FiredrakeEnsembleCommunicator here, not {type(comm)}' + return comm.Irecv(self.functionspace, source=source, tag=tag) + + def bcast(self, root=None, comm=None): + """ + Routine for broadcasting values + + Args: + root (int): process with value to broadcast + comm: communicator + + Returns: + broadcasted values + """ + assert ( + type(comm) == FiredrakeEnsembleCommunicator + ), f'Need to give a FiredrakeEnsembleCommunicator here, not {type(comm)}' + comm.Bcast(self.functionspace, root=root) + return self + class IMEX_firedrake_mesh(object): """ diff --git a/pySDC/tests/test_datatypes/test_firedrake_mesh.py b/pySDC/tests/test_datatypes/test_firedrake_mesh.py index 781d765a354ddcf782b55574578895e5ddc3951a..ba9745794593b5a2fcdd094a205cd8b0839d9400 100644 --- a/pySDC/tests/test_datatypes/test_firedrake_mesh.py +++ b/pySDC/tests/test_datatypes/test_firedrake_mesh.py @@ -151,5 +151,86 @@ def test_rmul_rhs(n=3, v1=1, v2=2): assert np.allclose(b.expl.dat._numpy_data, v2 * v1) +def _test_p2p_communication(comm, u): + import numpy as np + + assert comm.size == 2 + if comm.rank == 0: + u.assign(3.14) + req = u.isend(dest=1, comm=comm, tag=0) + elif comm.rank == 1: + assert not np.allclose(u.dat._numpy_data, 3.14) + req = u.irecv(source=0, comm=comm, tag=0) + req.wait() + assert np.allclose(u.dat._numpy_data, 3.14) + + +def _test_bcast(comm, u): + import numpy as np + + if comm.rank == 0: + u.assign(3.14) + else: + assert not np.allclose(u.dat._numpy_data, 3.14) + u.bcast(root=0, comm=comm) + assert np.allclose(u.dat._numpy_data, 3.14) + + +@pytest.mark.firedrake +@pytest.mark.parametrize('pattern', ['p2p', 'bcast']) +def test_communication(pattern, n=2, submit=True): + if submit: + import os + import subprocess + + my_env = os.environ.copy() + my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' + cwd = '.' + num_procs = 2 + cmd = f'mpiexec -np {num_procs} python {__file__} --pattern {pattern}'.split() + + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, cwd=cwd) + p.wait() + for line in p.stdout: + print(line) + for line in p.stderr: + print(line) + assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( + p.returncode, + num_procs, + ) + + else: + import firedrake as fd + from pySDC.helpers.firedrake_ensemble_communicator import FiredrakeEnsembleCommunicator + from pySDC.implementations.datatype_classes.firedrake_mesh import firedrake_mesh + + ensemble_comm = FiredrakeEnsembleCommunicator(fd.COMM_WORLD, 1) + + mesh = fd.UnitSquareMesh(n, n, comm=ensemble_comm.space_comm) + V = fd.VectorFunctionSpace(mesh, "CG", 2) + + u = firedrake_mesh(V) + + if pattern == 'p2p': + _test_p2p_communication(ensemble_comm, u) + elif pattern == 'bcast': + _test_bcast(ensemble_comm, u) + else: + raise NotImplementedError + + if __name__ == '__main__': - test_addition() + from argparse import ArgumentParser + + parser = ArgumentParser() + parser.add_argument( + '--pattern', + help="pattern for parallel tests", + type=str, + default=None, + ) + args = parser.parse_args() + + if args.pattern: + test_communication(pattern=args.pattern, submit=False) diff --git a/pySDC/tests/test_helpers/test_gusto_coupling.py b/pySDC/tests/test_helpers/test_gusto_coupling.py index 4563d7a9c14a57b5380ff86fc623764bcd3ca585..29410d3e8ad527c5691864af4ccc0b467c67295a 100644 --- a/pySDC/tests/test_helpers/test_gusto_coupling.py +++ b/pySDC/tests/test_helpers/test_gusto_coupling.py @@ -1,19 +1,19 @@ import pytest -def get_gusto_stepper(eqns, method, spatial_methods): +def get_gusto_stepper(eqns, method, spatial_methods, dirname='./tmp'): from gusto import IO, OutputParameters, PrescribedTransport import sys if '--running-tests' not in sys.argv: sys.argv.append('--running-tests') - output = OutputParameters(dirname='./tmp', dumpfreq=15) + output = OutputParameters(dirname=dirname, dumpfreq=15) io = IO(method.domain, output) return PrescribedTransport(eqns, method, io, False, transport_method=spatial_methods) -def tracer_setup(tmpdir='./tmp', degree=1, small_dt=False): +def tracer_setup(tmpdir='./tmp', degree=1, small_dt=False, comm=None): from firedrake import ( IcosahedralSphereMesh, PeriodicIntervalMesh, @@ -23,6 +23,7 @@ def tracer_setup(tmpdir='./tmp', degree=1, small_dt=False): sqrt, exp, pi, + COMM_WORLD, ) from gusto import OutputParameters, Domain, IO from collections import namedtuple @@ -32,7 +33,8 @@ def tracer_setup(tmpdir='./tmp', degree=1, small_dt=False): TracerSetup.__new__.__defaults__ = (None,) * len(opts) radius = 1 - mesh = IcosahedralSphereMesh(radius=radius, refinement_level=3, degree=1) + comm = COMM_WORLD if comm is None else comm + mesh = IcosahedralSphereMesh(radius=radius, refinement_level=3, degree=1, comm=comm) x = SpatialCoordinate(mesh) # Parameters chosen so that dt != 1 @@ -615,9 +617,173 @@ def test_pySDC_integrator_with_adaptivity(dt_initial, setup): ), f'SDC does not match reference implementation with adaptive step size selection! Got relative difference of {error}' +@pytest.mark.firedrake +@pytest.mark.parametrize('n_steps', [1, 2, 4]) +@pytest.mark.parametrize('useMPIController', [True, False]) +def test_pySDC_integrator_MSSDC(n_steps, useMPIController, setup, submit=True): + if submit and useMPIController: + import os + import subprocess + + my_env = os.environ.copy() + my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' + cwd = '.' + cmd = f'mpiexec -np {n_steps} python {__file__} --test=MSSDC'.split() + + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, cwd=cwd) + p.wait() + for line in p.stdout: + print(line) + for line in p.stderr: + print(line) + assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( + p.returncode, + n_steps, + ) + return None + + from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI + from pySDC.helpers.pySDC_as_gusto_time_discretization import pySDC_integrator + from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_cls + from firedrake import norm, Constant, COMM_WORLD + import numpy as np + + MSSDC_args = {} + dirname = './tmp' + if useMPIController: + from pySDC.helpers.firedrake_ensemble_communicator import FiredrakeEnsembleCommunicator + + controller_communicator = FiredrakeEnsembleCommunicator(COMM_WORLD, 1) + assert controller_communicator.size == n_steps + MSSDC_args = {'useMPIController': True, 'controller_communicator': controller_communicator} + dirname = f'./tmp_{controller_communicator.rank}' + setup = tracer_setup(tmpdir=dirname, comm=controller_communicator.space_comm) + else: + MSSDC_args = {'useMPIController': False, 'n_steps': n_steps} + + method = BackwardEuler + use_transport_scheme = True + eqns, domain, spatial_methods, setup = get_gusto_advection_setup(use_transport_scheme, method.imex, setup) + + solver_parameters = { + 'snes_type': 'newtonls', + 'ksp_type': 'gmres', + 'pc_type': 'bjacobi', + 'sub_pc_type': 'ilu', + 'ksp_rtol': 1e-12, + 'snes_rtol': 1e-12, + 'ksp_atol': 1e-30, + 'snes_atol': 1e-30, + 'ksp_divtol': 1e30, + 'snes_divtol': 1e30, + 'snes_max_it': 99, + } + + # ------------------------------------------------------------------------ # + # Setup pySDC + # ------------------------------------------------------------------------ # + + level_params = dict() + level_params['restol'] = -1 + + step_params = dict() + step_params['maxiter'] = 1 + + sweeper_params = dict() + sweeper_params['quad_type'] = 'RADAU-RIGHT' + sweeper_params['node_type'] = 'LEGENDRE' + sweeper_params['num_nodes'] = 1 + sweeper_params['QI'] = 'IE' + sweeper_params['QE'] = 'PIC' + sweeper_params['initial_guess'] = 'copy' + + problem_params = dict() + + controller_params = dict() + controller_params['logger_level'] = 20 + controller_params['mssdc_jac'] = False + + description = dict() + description['problem_params'] = problem_params + description['sweeper_class'] = sweeper_cls + description['sweeper_params'] = sweeper_params + description['level_params'] = level_params + description['step_params'] = step_params + + # ------------------------------------------------------------------------ # + # Setup time steppers + # ------------------------------------------------------------------------ # + + tmax = float(domain.dt) * 2 * n_steps + + stepper_gusto = get_gusto_stepper( + eqns, + method.get_Gusto_method()(domain, solver_parameters=solver_parameters), + spatial_methods, + dirname=dirname, + ) + + domain.dt = Constant(domain.dt) * n_steps + + stepper_pySDC = get_gusto_stepper( + eqns, + pySDC_integrator( + description, + controller_params, + domain, + solver_parameters=solver_parameters, + imex=method.imex, + **MSSDC_args, + ), + spatial_methods, + dirname=dirname, + ) + + # ------------------------------------------------------------------------ # + # Get Initial conditions and run + # ------------------------------------------------------------------------ # + + for stepper in [stepper_gusto, stepper_pySDC]: + get_initial_conditions(stepper, setup) + stepper.run(t=0, tmax=tmax) + + # ------------------------------------------------------------------------ # + # Check results + # ------------------------------------------------------------------------ # + + assert stepper_gusto.t == stepper_pySDC.t + + error = norm(stepper_gusto.fields('f') - stepper_pySDC.fields('f')) / norm(stepper_gusto.fields('f')) + print(error) + + assert ( + error < solver_parameters['snes_rtol'] * 1e3 + ), f'pySDC and Gusto differ in method {method}! Got relative difference of {error}' + + if __name__ == '__main__': - setup = tracer_setup() - # test_generic_gusto_problem(setup) - # test_pySDC_integrator_RK(False, RK4, setup) - # test_pySDC_integrator(False, False, setup) - test_pySDC_integrator_with_adaptivity(1e-3, setup) + from mpi4py import MPI + from argparse import ArgumentParser + + if MPI.COMM_WORLD.size == 1: + setup = tracer_setup() + else: + setup = None + + parser = ArgumentParser() + parser.add_argument( + '--test', + help="which kind of test you want to run", + type=str, + default=None, + ) + args = parser.parse_args() + + if args.test == 'MSSDC': + test_pySDC_integrator_MSSDC(n_steps=MPI.COMM_WORLD.size, useMPIController=True, setup=setup, submit=False) + else: + # test_generic_gusto_problem(setup) + # test_pySDC_integrator_RK(False, RK4, setup) + # test_pySDC_integrator(False, False, setup) + test_pySDC_integrator_with_adaptivity(1e-3, setup) + # test_pySDC_integrator_MSSDC(2, False, setup)