diff --git a/HPC_setup/create_runscripts_HPC.sh b/HPC_setup/create_runscripts_HPC.sh index 1322f47d69e213071b7aaaf4e93c24add8705b9d..bcbb5fb07800079736361450d7f0fed8684dc344 100755 --- a/HPC_setup/create_runscripts_HPC.sh +++ b/HPC_setup/create_runscripts_HPC.sh @@ -112,7 +112,7 @@ timestamp=\`date +"%Y-%m-%d_%H%M-%S"\` export PYTHONPATH=\${PWD}/venv_${hpcsys}/lib/python3.6/site-packages:\${PYTHONPATH} -srun python run.py --experiment_date=\$timestamp +srun python run_HPC.py --experiment_date=\$timestamp EOT fi diff --git a/run.py b/run.py index 2395064c25fcaf5fed1ede6a919b06bbc62f27e9..a9d8190628e1692c4b2812d3c8790bccd6b1b589 100644 --- a/run.py +++ b/run.py @@ -1,38 +1,19 @@ __author__ = "Lukas Leufen" -__date__ = '2019-11-14' +__date__ = '2020-06-29' import argparse - -from src.run_modules.experiment_setup import ExperimentSetup -from src.run_modules.partition_check import PartitionCheck -from src.run_modules.model_setup import ModelSetup -from src.run_modules.post_processing import PostProcessing -from src.run_modules.pre_processing import PreProcessing -from src.run_modules.run_environment import RunEnvironment -from src.run_modules.training import Training +from src.workflows import DefaultWorkflow def main(parser_args): - experiment_date = parser_args.experiment_date - with RunEnvironment(): - ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], - station_type='background', trainable=False, create_new_model=True, window_history_size=6, - experiment_date=experiment_date, create_new_bootstraps=False) - PreProcessing() - - PartitionCheck() - - ModelSetup() - Training() + workflow = DefaultWorkflow(**parser_args.__dict__) + workflow.run() - PostProcessing() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default="testrun", help="set experiment date as string") args = parser.parse_args() - main(args) - diff --git a/run_HPC.py b/run_HPC.py new file mode 100644 index 0000000000000000000000000000000000000000..fc2ead406469f0a254f5819e43c1e0d3542bb8d9 --- /dev/null +++ b/run_HPC.py @@ -0,0 +1,19 @@ +__author__ = "Lukas Leufen" +__date__ = '2020-06-29' + +import argparse +from src.workflows import DefaultWorkflowHPC + + +def main(parser_args): + + workflow = DefaultWorkflowHPC(**parser_args.__dict__) + workflow.run() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default="testrun", + help="set experiment date as string") + args = parser.parse_args() + main(args) diff --git a/run_hourly.py b/run_hourly.py index df3266405bc195cbe4c3546b4c7fd0c6b2925a84..682988f6f730d02be713c074dd63fc732e2868dc 100644 --- a/run_hourly.py +++ b/run_hourly.py @@ -3,27 +3,13 @@ __date__ = '2019-11-14' import argparse -from src.run_modules.experiment_setup import ExperimentSetup -from src.run_modules.model_setup import ModelSetup -from src.run_modules.post_processing import PostProcessing -from src.run_modules.pre_processing import PreProcessing -from src.run_modules.run_environment import RunEnvironment -from src.run_modules.training import Training +from src.workflows import DefaultWorkflow def main(parser_args): - experiment_date = parser_args.experiment_date - with RunEnvironment(): - ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], - station_type='background', trainable=True, sampling="hourly", window_history_size=48, - experiment_date=experiment_date) - PreProcessing() - ModelSetup() - - Training() - - PostProcessing() + workflow = DefaultWorkflow(sampling="hourly", window_history_size=48, **parser_args.__dict__) + workflow.run() if __name__ == "__main__": @@ -31,5 +17,4 @@ if __name__ == "__main__": parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None, help="set experiment date as string") args = parser.parse_args(["--experiment_date", "testrun"]) - main(args) diff --git a/run_zam347.py b/run_zam347.py index 69b3cd6fee0e60212de9c74c2dd29b720bc81b81..2d351a8925e67b0bdfc010e92a3937435e160b2f 100644 --- a/run_zam347.py +++ b/run_zam347.py @@ -11,6 +11,7 @@ from src.run_modules.post_processing import PostProcessing from src.run_modules.pre_processing import PreProcessing from src.run_modules.run_environment import RunEnvironment from src.run_modules.training import Training +from src.workflows import DefaultWorkflowHPC def load_stations(): @@ -29,17 +30,9 @@ def load_stations(): def main(parser_args): - experiment_date = parser_args.experiment_date - with RunEnvironment(): - ExperimentSetup(stations=load_stations(), station_type='background', trainable=False, create_new_model=True, - experiment_date=experiment_date) - PreProcessing() - ModelSetup() - - Training() - - PostProcessing() + workflow = DefaultWorkflowHPC(stations=load_stations(), **parser_args.__dict__) + workflow.run() if __name__ == "__main__": diff --git a/src/__init__.py b/src/__init__.py index 9559822193d070bb886b59e2605bfc7aa73eef5e..5b7073ff042f6173fd78362f55d698eb6745552f 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -5,7 +5,8 @@ __version_info__ = { } from src.run_modules import * -from src.run import run +from src.workflows import DefaultWorkflow, Workflow + def get_version(): diff --git a/src/configuration/defaults.py b/src/configuration/defaults.py index 7ce96cfce515e7f32d98444e6a9542c9fbd7b4f4..0038bb5512d602150905f6504bcd5e135b127382 100644 --- a/src/configuration/defaults.py +++ b/src/configuration/defaults.py @@ -2,12 +2,7 @@ __author__ = "Lukas Leufen" __date__ = '2020-06-25' -DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', - 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081', - 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001', - 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', - 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', - 'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', 'DEBY088', ] +DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} diff --git a/src/run.py b/src/run.py index 900ebb47bc868b1364d10021e6b4fe8dc7186c7d..1494be0a75ebd1fd6a057d337ec2dcef8ed9c64f 100644 --- a/src/run.py +++ b/src/run.py @@ -1,6 +1,7 @@ +__author__ = "Lukas Leufen" +__date__ = '2020-06-29' -from src.run_modules import * -import argparse +from src.workflows import DefaultWorkflow import inspect @@ -29,16 +30,11 @@ def run(stations=None, batch_size=None, epochs=None): - params = inspect.getfullargspec(ExperimentSetup).args + params = inspect.getfullargspec(DefaultWorkflow).args kwargs = {k: v for k, v in locals().items() if k in params and v is not None} - with RunEnvironment(): - ExperimentSetup(**kwargs) - PreProcessing() - PartitionCheck() - ModelSetup() - Training() - PostProcessing() + workflow = DefaultWorkflow(**kwargs) + workflow.run() if __name__ == "__main__": diff --git a/src/workflows/__init__.py b/src/workflows/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..57e514cd9ced32fbf1dbb290b1008deffcec52d3 --- /dev/null +++ b/src/workflows/__init__.py @@ -0,0 +1,2 @@ +from src.workflows.abstract_workflow import Workflow +from src.workflows.default_workflow import DefaultWorkflow, DefaultWorkflowHPC \ No newline at end of file diff --git a/src/workflows/abstract_workflow.py b/src/workflows/abstract_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..5d4e62c8a2e409e865f43412a6757a9cb4e4b1f3 --- /dev/null +++ b/src/workflows/abstract_workflow.py @@ -0,0 +1,29 @@ +"""Abstract workflow.""" + +__author__ = "Lukas Leufen" +__date__ = '2020-06-26' + +from collections import OrderedDict + +from src import RunEnvironment + + +class Workflow: + """Abstract workflow class to handle sequence of stages (run modules). An inheriting class has to first initialise + this mother class and can afterwards add an arbitrary number of stages by using the add method. The execution order + is equal to the ordering of the stages have been added. To run the workflow, finally, a single call of the run + method is sufficient. It must be taken care for inter-stage dependencies, this workflow class only handles the + execution but not the dependencies (workflow would probably fail in this case).""" + + def __init__(self): + self._registry = OrderedDict() + + def add(self, stage, **kwargs): + """Add a new stage with optional kwargs.""" + self._registry[stage] = kwargs + + def run(self): + """Run workflow embedded in a run environment and according to the stage's ordering.""" + with RunEnvironment(): + for stage, kwargs in self._registry.items(): + stage(**kwargs) \ No newline at end of file diff --git a/src/workflows/default_workflow.py b/src/workflows/default_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..6a60c6ae60808b616fded3475179371763fd6feb --- /dev/null +++ b/src/workflows/default_workflow.py @@ -0,0 +1,98 @@ +"""Default workflow.""" + +__author__ = "Lukas Leufen" +__date__ = '2020-06-26' + +import inspect +from src.helpers import remove_items +from src.run_modules import ExperimentSetup, PreProcessing, PartitionCheck, ModelSetup, Training, PostProcessing +from src.workflows.abstract_workflow import Workflow + + +class DefaultWorkflow(Workflow): + """A default workflow executing ExperimentSetup, PreProcessing, ModelSetup, Training and PostProcessing in exact + the mentioned ordering.""" + + def __init__(self, stations=None, + station_type=None, + trainable=None, create_new_model=None, + window_history_size=None, + experiment_date="testrun", + network=None, + variables=None, statistics_per_var=None, + start=None, end=None, + target_var=None, target_dim=None, + window_lead_time=None, + dimensions=None, + interpolate_method=None, interpolate_dim=None, limit_nan_fill=None, + train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, test_end=None, + use_all_stations_on_all_data_sets=None, fraction_of_train=None, + experiment_path=None, plot_path=None, forecast_path=None, bootstrap_path=None, overwrite_local_data=None, + sampling=None, + permute_data_on_training=None, extreme_values=None, extremes_on_right_tail_only=None, + transformation=None, + train_min_length=None, val_min_length=None, test_min_length=None, + evaluate_bootstraps=None, number_of_bootstraps=None, create_new_bootstraps=None, + plot_list=None, + model=None, + batch_size=None, + epochs=None): + super().__init__() + + # extract all given kwargs arguments + params = remove_items(inspect.getfullargspec(self.__init__).args, "self") + kwargs = {k: v for k, v in locals().items() if k in params and v is not None} + self._setup(**kwargs) + + def _setup(self, **kwargs): + """Set up default workflow.""" + self.add(ExperimentSetup, **kwargs) + self.add(PreProcessing) + self.add(ModelSetup) + self.add(Training) + self.add(PostProcessing) + + +class DefaultWorkflowHPC(Workflow): + """A default workflow for Jülich HPC systems executing ExperimentSetup, PreProcessing, PartitionCheck, ModelSetup, + Training and PostProcessing in exact the mentioned ordering.""" + + def __init__(self, stations=None, + station_type=None, + trainable=None, create_new_model=None, + window_history_size=None, + experiment_date="testrun", + network=None, + variables=None, statistics_per_var=None, + start=None, end=None, + target_var=None, target_dim=None, + window_lead_time=None, + dimensions=None, + interpolate_method=None, interpolate_dim=None, limit_nan_fill=None, + train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, test_end=None, + use_all_stations_on_all_data_sets=None, fraction_of_train=None, + experiment_path=None, plot_path=None, forecast_path=None, bootstrap_path=None, overwrite_local_data=None, + sampling=None, + permute_data_on_training=None, extreme_values=None, extremes_on_right_tail_only=None, + transformation=None, + train_min_length=None, val_min_length=None, test_min_length=None, + evaluate_bootstraps=None, number_of_bootstraps=None, create_new_bootstraps=None, + plot_list=None, + model=None, + batch_size=None, + epochs=None): + super().__init__() + + # extract all given kwargs arguments + params = remove_items(inspect.getfullargspec(self.__init__).args, "self") + kwargs = {k: v for k, v in locals().items() if k in params and v is not None} + self._setup(**kwargs) + + def _setup(self, **kwargs): + """Set up default workflow.""" + self.add(ExperimentSetup, **kwargs) + self.add(PreProcessing) + self.add(PartitionCheck) + self.add(ModelSetup) + self.add(Training) + self.add(PostProcessing) diff --git a/test/test_modules/test_experiment_setup.py b/test/test_modules/test_experiment_setup.py index 2ecd78f0c7e2b14c0b9c64192c06014e2a0da75f..5b7d517e658de6bd71e1b4190bb5114dc005216e 100644 --- a/test/test_modules/test_experiment_setup.py +++ b/test/test_modules/test_experiment_setup.py @@ -49,13 +49,7 @@ class TestExperimentSetup: 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} # setup for data - default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', - 'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', - 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', - 'DEBW042', 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', - 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', - 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', 'DEBY005', 'DEBW046', 'DEBW103', - 'DEBW052', 'DEBW034', 'DEBY088', ] + default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] assert data_store.get("stations", "general") == default_stations assert data_store.get("network", "general") == "AIRBASE" assert data_store.get("station_type", "general") == "background"