Skip to content
Snippets Groups Projects
Commit 61116219 authored by lukas leufen's avatar lukas leufen
Browse files

Merge branch 'lukas_issue134_feat_run-module-registry' into 'develop'

Resolve "run module registry"

See merge request toar/machinelearningtools!111
parents 080d00fc c2913215
No related branches found
No related tags found
3 merge requests!125Release v0.10.0,!124Update Master to new version v0.10.0,!111Resolve "run module registry"
Pipeline #40054 passed
......@@ -112,7 +112,7 @@ timestamp=\`date +"%Y-%m-%d_%H%M-%S"\`
export PYTHONPATH=\${PWD}/venv_${hpcsys}/lib/python3.6/site-packages:\${PYTHONPATH}
srun python run.py --experiment_date=\$timestamp
srun python run_HPC.py --experiment_date=\$timestamp
EOT
fi
......
__author__ = "Lukas Leufen"
__date__ = '2019-11-14'
__date__ = '2020-06-29'
import argparse
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.partition_check import PartitionCheck
from src.run_modules.model_setup import ModelSetup
from src.run_modules.post_processing import PostProcessing
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.training import Training
from src.workflows import DefaultWorkflow
def main(parser_args):
experiment_date = parser_args.experiment_date
with RunEnvironment():
ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background', trainable=False, create_new_model=True, window_history_size=6,
experiment_date=experiment_date, create_new_bootstraps=False)
PreProcessing()
PartitionCheck()
ModelSetup()
Training()
workflow = DefaultWorkflow(**parser_args.__dict__)
workflow.run()
PostProcessing()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default="testrun",
help="set experiment date as string")
args = parser.parse_args()
main(args)
__author__ = "Lukas Leufen"
__date__ = '2020-06-29'
import argparse
from src.workflows import DefaultWorkflowHPC
def main(parser_args):
workflow = DefaultWorkflowHPC(**parser_args.__dict__)
workflow.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default="testrun",
help="set experiment date as string")
args = parser.parse_args()
main(args)
......@@ -3,27 +3,13 @@ __date__ = '2019-11-14'
import argparse
from src.run_modules.experiment_setup import ExperimentSetup
from src.run_modules.model_setup import ModelSetup
from src.run_modules.post_processing import PostProcessing
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.training import Training
from src.workflows import DefaultWorkflow
def main(parser_args):
experiment_date = parser_args.experiment_date
with RunEnvironment():
ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'],
station_type='background', trainable=True, sampling="hourly", window_history_size=48,
experiment_date=experiment_date)
PreProcessing()
ModelSetup()
Training()
PostProcessing()
workflow = DefaultWorkflow(sampling="hourly", window_history_size=48, **parser_args.__dict__)
workflow.run()
if __name__ == "__main__":
......@@ -31,5 +17,4 @@ if __name__ == "__main__":
parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None,
help="set experiment date as string")
args = parser.parse_args(["--experiment_date", "testrun"])
main(args)
......@@ -11,6 +11,7 @@ from src.run_modules.post_processing import PostProcessing
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.run_environment import RunEnvironment
from src.run_modules.training import Training
from src.workflows import DefaultWorkflowHPC
def load_stations():
......@@ -29,17 +30,9 @@ def load_stations():
def main(parser_args):
experiment_date = parser_args.experiment_date
with RunEnvironment():
ExperimentSetup(stations=load_stations(), station_type='background', trainable=False, create_new_model=True,
experiment_date=experiment_date)
PreProcessing()
ModelSetup()
Training()
PostProcessing()
workflow = DefaultWorkflowHPC(stations=load_stations(), **parser_args.__dict__)
workflow.run()
if __name__ == "__main__":
......
......@@ -5,7 +5,8 @@ __version_info__ = {
}
from src.run_modules import *
from src.run import run
from src.workflows import DefaultWorkflow, Workflow
def get_version():
......
......@@ -2,12 +2,7 @@ __author__ = "Lukas Leufen"
__date__ = '2020-06-25'
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004',
'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081',
'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001',
'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010',
'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063',
'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', 'DEBY088', ]
DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
......
__author__ = "Lukas Leufen"
__date__ = '2020-06-29'
from src.run_modules import *
import argparse
from src.workflows import DefaultWorkflow
import inspect
......@@ -29,16 +30,11 @@ def run(stations=None,
batch_size=None,
epochs=None):
params = inspect.getfullargspec(ExperimentSetup).args
params = inspect.getfullargspec(DefaultWorkflow).args
kwargs = {k: v for k, v in locals().items() if k in params and v is not None}
with RunEnvironment():
ExperimentSetup(**kwargs)
PreProcessing()
PartitionCheck()
ModelSetup()
Training()
PostProcessing()
workflow = DefaultWorkflow(**kwargs)
workflow.run()
if __name__ == "__main__":
......
from src.workflows.abstract_workflow import Workflow
from src.workflows.default_workflow import DefaultWorkflow, DefaultWorkflowHPC
\ No newline at end of file
"""Abstract workflow."""
__author__ = "Lukas Leufen"
__date__ = '2020-06-26'
from collections import OrderedDict
from src import RunEnvironment
class Workflow:
"""Abstract workflow class to handle sequence of stages (run modules). An inheriting class has to first initialise
this mother class and can afterwards add an arbitrary number of stages by using the add method. The execution order
is equal to the ordering of the stages have been added. To run the workflow, finally, a single call of the run
method is sufficient. It must be taken care for inter-stage dependencies, this workflow class only handles the
execution but not the dependencies (workflow would probably fail in this case)."""
def __init__(self):
self._registry = OrderedDict()
def add(self, stage, **kwargs):
"""Add a new stage with optional kwargs."""
self._registry[stage] = kwargs
def run(self):
"""Run workflow embedded in a run environment and according to the stage's ordering."""
with RunEnvironment():
for stage, kwargs in self._registry.items():
stage(**kwargs)
\ No newline at end of file
"""Default workflow."""
__author__ = "Lukas Leufen"
__date__ = '2020-06-26'
import inspect
from src.helpers import remove_items
from src.run_modules import ExperimentSetup, PreProcessing, PartitionCheck, ModelSetup, Training, PostProcessing
from src.workflows.abstract_workflow import Workflow
class DefaultWorkflow(Workflow):
"""A default workflow executing ExperimentSetup, PreProcessing, ModelSetup, Training and PostProcessing in exact
the mentioned ordering."""
def __init__(self, stations=None,
station_type=None,
trainable=None, create_new_model=None,
window_history_size=None,
experiment_date="testrun",
network=None,
variables=None, statistics_per_var=None,
start=None, end=None,
target_var=None, target_dim=None,
window_lead_time=None,
dimensions=None,
interpolate_method=None, interpolate_dim=None, limit_nan_fill=None,
train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, test_end=None,
use_all_stations_on_all_data_sets=None, fraction_of_train=None,
experiment_path=None, plot_path=None, forecast_path=None, bootstrap_path=None, overwrite_local_data=None,
sampling=None,
permute_data_on_training=None, extreme_values=None, extremes_on_right_tail_only=None,
transformation=None,
train_min_length=None, val_min_length=None, test_min_length=None,
evaluate_bootstraps=None, number_of_bootstraps=None, create_new_bootstraps=None,
plot_list=None,
model=None,
batch_size=None,
epochs=None):
super().__init__()
# extract all given kwargs arguments
params = remove_items(inspect.getfullargspec(self.__init__).args, "self")
kwargs = {k: v for k, v in locals().items() if k in params and v is not None}
self._setup(**kwargs)
def _setup(self, **kwargs):
"""Set up default workflow."""
self.add(ExperimentSetup, **kwargs)
self.add(PreProcessing)
self.add(ModelSetup)
self.add(Training)
self.add(PostProcessing)
class DefaultWorkflowHPC(Workflow):
"""A default workflow for Jülich HPC systems executing ExperimentSetup, PreProcessing, PartitionCheck, ModelSetup,
Training and PostProcessing in exact the mentioned ordering."""
def __init__(self, stations=None,
station_type=None,
trainable=None, create_new_model=None,
window_history_size=None,
experiment_date="testrun",
network=None,
variables=None, statistics_per_var=None,
start=None, end=None,
target_var=None, target_dim=None,
window_lead_time=None,
dimensions=None,
interpolate_method=None, interpolate_dim=None, limit_nan_fill=None,
train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, test_end=None,
use_all_stations_on_all_data_sets=None, fraction_of_train=None,
experiment_path=None, plot_path=None, forecast_path=None, bootstrap_path=None, overwrite_local_data=None,
sampling=None,
permute_data_on_training=None, extreme_values=None, extremes_on_right_tail_only=None,
transformation=None,
train_min_length=None, val_min_length=None, test_min_length=None,
evaluate_bootstraps=None, number_of_bootstraps=None, create_new_bootstraps=None,
plot_list=None,
model=None,
batch_size=None,
epochs=None):
super().__init__()
# extract all given kwargs arguments
params = remove_items(inspect.getfullargspec(self.__init__).args, "self")
kwargs = {k: v for k, v in locals().items() if k in params and v is not None}
self._setup(**kwargs)
def _setup(self, **kwargs):
"""Set up default workflow."""
self.add(ExperimentSetup, **kwargs)
self.add(PreProcessing)
self.add(PartitionCheck)
self.add(ModelSetup)
self.add(Training)
self.add(PostProcessing)
......@@ -49,13 +49,7 @@ class TestExperimentSetup:
'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu',
'cloudcover': 'average_values', 'pblheight': 'maximum'}
# setup for data
default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022',
'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039',
'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072',
'DEBW042', 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037',
'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084',
'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', 'DEBY005', 'DEBW046', 'DEBW103',
'DEBW052', 'DEBW034', 'DEBY088', ]
default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
assert data_store.get("stations", "general") == default_stations
assert data_store.get("network", "general") == "AIRBASE"
assert data_store.get("station_type", "general") == "background"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment