diff --git a/run.py b/run.py index 5e09269807aa239f21a3b59e3373ddc355dd667f..1579ae35f0d270dc0d2529cf1b6d36bc410e317a 100644 --- a/run.py +++ b/run.py @@ -4,7 +4,7 @@ __date__ = '2019-11-14' import logging import argparse -from src.experiment_setup import ExperimentSetup +from src.modules.experiment_setup import ExperimentSetup from src.modules import run, PreProcessing, Training, PostProcessing @@ -30,4 +30,6 @@ if __name__ == "__main__": help="set experiment date as string") args = parser.parse_args() - main() + experiment = ExperimentSetup(args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) + a = 1 + # main() diff --git a/src/experiment_setup.py b/src/experiment_setup.py deleted file mode 100644 index 4fc145732ecbce653e617060a9b38edc359c7a43..0000000000000000000000000000000000000000 --- a/src/experiment_setup.py +++ /dev/null @@ -1,94 +0,0 @@ -__author__ = "Lukas Leufen" -__date__ = '2019-11-15' - - -from src import helpers -import logging -import argparse - - -class ExperimentSetup(object): - """ - params: - trainable: Train new model if true, otherwise try to load existing model - """ - - def __init__(self, parser_args, **kwargs): - self.args = self._set_parser_args(parser_args) - self.data_path = None - self.experiment_path = None - self.experiment_name = None - self.trainable = None - self.fraction_of_train = None - self.use_all_stations_on_all_data_sets = None - self.network = None - self.var_all_dict = None - self.stations = None - self.variables = None - self.dimensions = None - self.interpolate_dim = None - self.target_dim = None - self.target_var = None - self.train_kwargs = None - self.val_kwargs = None - self.test_kwargs = None - self.setup_experiment(**kwargs) - - def _set_param(self, param, value, default=None): - if default is not None: - value = value.get(param, default) - setattr(self, param, value) - logging.debug(f"set experiment attribute: {param}={value}") - - @staticmethod - def _set_parser_args(args): - """ - Transform args to dict if given as argparse.Namespace - :param args: - :return: - """ - if isinstance(args, argparse.Namespace): - return args.__dict__ - return args - - def setup_experiment(self, **kwargs): - - # set data path of this experiment - self._set_param("data_path", helpers.prepare_host()) - - # set experiment name - exp_date = self.args.get("experiment_date") - exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date) - self._set_param("experiment_name", exp_name) - self._set_param("experiment_path", exp_path) - helpers.check_path_and_create(self.experiment_path) - - # set if model is trainable - self._set_param("trainable", kwargs, default=True) - - # set fraction of train - self._set_param("fraction_of_train", kwargs, default=0.8) - - # use all stations on all data sets (train, val, test) - self._set_param("use_all_stations_on_all_data_sets", kwargs, default=True) - self._set_param("network", kwargs, default="AIRBASE") - self._set_param("var_all_dict", kwargs, - default={'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', - 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', - 'pblheight': 'maximum'}) - self._set_param("stations", kwargs, - default=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', - 'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', - 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', - 'DEBW042', 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', - 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', - 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', 'DEBY005', 'DEBW046', 'DEBW103', - 'DEBW052', 'DEBW034', 'DEBY088', ]) - self._set_param("variables", kwargs, default=list(self.var_all_dict.keys())) - self._set_param("dimensions", kwargs, default={'new_index': ['datetime', 'Stations']}) - self._set_param("interpolate_dim", kwargs, default='datetime') - self._set_param("target_dim", kwargs, default='variables') - self._set_param("target_var", kwargs, default="o3") - self._set_param("train_kwargs", kwargs, default={"start": "1997-01-01", "end": "2007-12-31"}) - self._set_param("val_kwargs", kwargs, default={"start": "2008-01-01", "end": "2009-12-31"}) - self._set_param("test_kwargs", kwargs, default={"start": "2010-01-01", "end": "2017-12-31"}) diff --git a/src/helpers.py b/src/helpers.py index 83c5dcf44ad6462b9469eaf218426d43f7b2e91e..1b635a2e9cd00bfbb3650a5a7a8768378cfcdb6b 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,3 +1,5 @@ +import re + __author__ = 'Lukas Leufen' __date__ = '2019-10-21' @@ -180,3 +182,16 @@ def set_experiment_name(experiment_date=None, experiment_path=None): else: experiment_path = os.path.abspath(experiment_path) return experiment_name, experiment_path + + +class PyTestRegex: + """Assert that a given string meets some expectations.""" + + def __init__(self, pattern: str, flags: int = 0): + self._regex = re.compile(pattern, flags) + + def __eq__(self, actual: str) -> bool: + return bool(self._regex.match(actual)) + + def __repr__(self) -> str: + return self._regex.pattern diff --git a/src/modules.py b/src/modules.py deleted file mode 100644 index 01f7ed677c426ba7a6a0c180aeaef5de6257ae77..0000000000000000000000000000000000000000 --- a/src/modules.py +++ /dev/null @@ -1,195 +0,0 @@ -from src.helpers import TimeTracking -import logging -import time -from src.data_generator import DataGenerator -from src.experiment_setup import ExperimentSetup -import argparse -from typing import Dict, List, Any, Tuple - - -class run(object): - """ - basic run class to measure execution time. Either call this class calling it by 'with' or delete the class instance - after finishing the measurement. The duration result is logged. - """ - - del_by_exit = False - - def __init__(self): - """ - Starts time tracking automatically and logs as info. - """ - self.time = TimeTracking() - logging.info(f"{self.__class__.__name__} started") - - def __del__(self): - """ - This is the class finalizer. The code is not executed if already called by exit method to prevent duplicated - logging (__exit__ is always executed before __del__) it this class was used in a with statement. - """ - if not self.del_by_exit: - self.time.stop() - logging.info(f"{self.__class__.__name__} finished after {self.time}") - self.del_by_exit = True - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.__del__() - - @staticmethod - def do_stuff(length=2): - time.sleep(length) - - -class PreProcessing(run): - - """ - Pre-process your data by using this class. It includes time tracking and uses the experiment setup to look for data - and stores it if not already in local disk. Further, it provides this data as a generator and checks for valid - stations (in this context: valid=data available). Finally, it splits the data into valid training, validation and - testing subsets. - """ - - def __init__(self, experiment_setup: ExperimentSetup): - super().__init__() - self.setup = experiment_setup - self.kwargs = None - self.valid_stations = [] - self._run() - - def _run(self): - kwargs = {'start': '1997-01-01', 'end': '2017-12-31', 'limit_nan_fill': 1, 'window_history': 13, - 'window_lead_time': 3, 'interpolate_method': 'linear', - 'statistics_per_var': self.setup.var_all_dict, } - args = self.setup.__dict__ - valid_stations = self.check_valid_stations(args, kwargs, self.setup.stations) - args = self.update_key(args, "stations", valid_stations) - data_gen = DataGenerator(**args, **kwargs) - train, val, test = self.split_train_val_test(data_gen, valid_stations, args, kwargs) - # print stats of data - - def split_train_val_test(self, data, stations, args, kwargs): - train_index, val_index, test_index = self.split_set_indices(len(stations), args["fraction_of_training"]) - train = self.create_set_split(stations, args, kwargs, train_index, "train") - val = self.create_set_split(stations, args, kwargs, val_index, "val") - test = self.create_set_split(stations, args, kwargs, test_index, "test") - return train, val, test - - @staticmethod - def split_set_indices(total_length: int, fraction: float) -> Tuple[slice, slice, slice]: - """ - create the training, validation and test subset slice indices for given total_length. The test data consists on - (1-fraction) of total_length (fraction*len:end). Train and validation data therefore are made from fraction of - total_length (0:fraction*len). Train and validation data is split by the factor 0.8 for train and 0.2 for - validation. - :param total_length: list with all objects to split - :param fraction: ratio between test and union of train/val data - :return: slices for each subset in the order: train, val, test - """ - pos_test_split = int(total_length * fraction) - train_index = slice(0, int(pos_test_split * 0.8)) - val_index = slice(int(pos_test_split * 0.8), pos_test_split) - test_index = slice(pos_test_split, total_length) - return train_index, val_index, test_index - - def create_set_split(self, stations, args, kwargs, index_list, set_name): - if args["use_all_stations_on_all_data_sets"]: - set_stations = stations - else: - set_stations = stations[index_list] - logging.debug(f"{set_name.capitalize()} stations (len={set_stations}): {set_stations}") - set_kwargs = self.update_kwargs(args, kwargs, f"{set_name}_kwargs") - set_stations = self.check_valid_stations(args, set_kwargs, set_stations) - set_args = self.update_key(args, "stations", set_stations) - data_set = DataGenerator(**set_args, **set_kwargs) - return data_set - - @staticmethod - def update_key(orig_dict: Dict, key: str, value: Any) -> Dict: - """ - create copy of `orig_dict` and update given key by value, returns a copied dict. The original input dict - `orig_dict` is not modified by this function. - :param orig_dict: dictionary with arguments that should be updated - :param key: the key to update - :param value: the update itself for given key - :return: updated dict - """ - updated = orig_dict.copy() - updated.update({key: value}) - return updated - - @staticmethod - def update_kwargs(args: Dict, kwargs: Dict, kwargs_name: str): - """ - copy kwargs and update kwargs parameters by another dictionary stored in args. Not existing keys in kwargs are - created, existing keys overwritten. - :param args: dict with the new kwargs parameters stored with key `kwargs_name` - :param kwargs: dict to update - :param kwargs_name: key in `args` to find the updates for `kwargs` - :return: updated kwargs dict - """ - kwargs_updated = kwargs.copy() - if kwargs_name in args.keys() and args[kwargs_name]: - kwargs_updated.update(args[kwargs_name]) - return kwargs_updated - - @staticmethod - def check_valid_stations(args: Dict, kwargs: Dict, all_stations: List[str]): - """ - Check if all given stations in `all_stations` are valid. Valid means, that there is data available for the given - time range (is included in `kwargs`). The shape and the loading time are logged in debug mode. - :param args: Dictionary with required parameters for DataGenerator class (`data_path`, `network`, `stations`, - `variables`, `interpolate_dim`, `target_dim`, `target_var`). - :param kwargs: positional parameters for the DataGenerator class (e.g. `start`, `interpolate_method`, - `window_lead_time`). - :param all_stations: All stations to check. - :return: Corrected list containing only valid station IDs. - """ - t_outer = TimeTracking() - t_inner = TimeTracking(start=False) - logging.info("check valid stations started") - valid_stations = [] - - # all required arguments of the DataGenerator can be found in args, positional arguments in args and kwargs - data_gen = DataGenerator(**args, **kwargs) - for station in all_stations: - t_inner.run() - try: - (history, label) = data_gen[station] - valid_stations.append(station) - logging.debug(f"{station}: history_shape = {history.shape}") - logging.debug(f"{station}: loading time = {t_inner}") - except AttributeError: - continue - logging.info(f"run for {t_outer} to check {len(all_stations)} station(s)") - return valid_stations - - -class Training(run): - - def __init__(self, setup): - super().__init__() - self.setup = setup - - -class PostProcessing(run): - - def __init__(self, setup): - super().__init__() - self.setup = setup - - -if __name__ == "__main__": - - formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' - logging.basicConfig(format=formatter, level=logging.DEBUG) - - parser = argparse.ArgumentParser() - parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, - help="set experiment date as string") - parser_args = parser.parse_args() - with run(): - setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) - PreProcessing(setup) diff --git a/src/modules/__init__.py b/src/modules/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/modules/experiment_setup.py b/src/modules/experiment_setup.py new file mode 100644 index 0000000000000000000000000000000000000000..f81d2a5b7ff2c7ab477454ee34d77f2c15381dd4 --- /dev/null +++ b/src/modules/experiment_setup.py @@ -0,0 +1,119 @@ +__author__ = "Lukas Leufen" +__date__ = '2019-11-15' + + +import logging +import argparse +from typing import Union, Dict, Any + +from src import helpers +from src.modules.run_environment import RunEnvironment + + +DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', + 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081', + 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001', + 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', + 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', + 'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', 'DEBY088', ] +DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + + +class ExperimentSetup(RunEnvironment): + """ + params: + trainable: Train new model if true, otherwise try to load existing model + """ + + def __init__(self, parser_args=None, var_all_dict=None, stations=None, network=None, variables=None, + statistics_per_var=None, start=None, end=None, window_history=None, target_var="o3", target_dim=None, + window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, + limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, + test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None, + experiment_path=None): + + # create run framework + super().__init__() + + # experiment setup + self._set_param("data_path", helpers.prepare_host()) + self._set_param("trainable", trainable, default=False) + self._set_param("fraction_of_train", fraction_of_train, default=0.8) + + # set experiment name + exp_date = self._get_parser_args(parser_args).get("experiment_date") + exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date, experiment_path=experiment_path) + self._set_param("experiment_name", exp_name) + self._set_param("experiment_path", exp_path) + helpers.check_path_and_create(self.data_store.get("experiment_path", "general")) + + # setup for data + self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT) + self._set_param("stations", stations, default=DEFAULT_STATIONS) + self._set_param("network", network, default="AIRBASE") + self._set_param("variables", variables, default=list(self.data_store.get("var_all_dict", "general").keys())) + self._set_param("statistics_per_var", statistics_per_var, default=self.data_store.get("var_all_dict", "general")) + self._set_param("start", start, default="1997-01-01", scope="general") + self._set_param("end", end, default="2017-12-31", scope="general") + self._set_param("window_history", window_history, default=13) + + # target + self._set_param("target_var", target_var, default="o3") + self._set_param("target_dim", target_dim, default='variables') + self._set_param("window_lead_time", window_lead_time, default=3) + + # interpolation + self._set_param("dimensions", dimensions, default={'new_index': ['datetime', 'Stations']}) + self._set_param("interpolate_dim", interpolate_dim, default='datetime') + self._set_param("interpolate_method", interpolate_method, default='linear') + self._set_param("limit_nan_fill", limit_nan_fill, default=1) + + # train parameters + self._set_param("start", train_start, default="1997-01-01", scope="general.train") + self._set_param("end", train_end, default="2007-12-31", scope="general.train") + + # validation parameters + self._set_param("start", val_start, default="2008-01-01", scope="general.val") + self._set_param("end", val_end, default="2009-12-31", scope="general.val") + + # test parameters + self._set_param("start", test_start, default="2010-01-01", scope="general.test") + self._set_param("end", test_end, default="2017-12-31", scope="general.test") + + # use all stations on all data sets (train, val, test) + self._set_param("use_all_stations_on_all_data_sets", use_all_stations_on_all_data_sets, default=True) + + def _set_param(self, param: str, value: Any, default: Any = None, scope: str = "general") -> None: + if value is None and default is not None: + value = default + self.data_store.put(param, value, scope) + logging.debug(f"set experiment attribute: {param}({scope})={value}") + + @staticmethod + def _get_parser_args(args: Union[Dict, argparse.Namespace]) -> Dict: + """ + Transform args to dict if given as argparse.Namespace + :param args: either a dictionary or an argument parser instance + :return: dictionary with all arguments + """ + if isinstance(args, argparse.Namespace): + return args.__dict__ + elif isinstance(args, dict): + return args + else: + return {} + + +if __name__ == "__main__": + + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, + help="set experiment date as string") + parser_args = parser.parse_args() + with RunEnvironment(): + setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) diff --git a/src/modules/modules.py b/src/modules/modules.py new file mode 100644 index 0000000000000000000000000000000000000000..8532e1d812a5de7a3b47423d9f9bb3c9bcd43abc --- /dev/null +++ b/src/modules/modules.py @@ -0,0 +1,33 @@ +import logging +# from src.experiment_setup import ExperimentSetup +import argparse + +from src.modules.run_environment import RunEnvironment + + +class Training(RunEnvironment): + + def __init__(self, setup): + super().__init__() + self.setup = setup + + +class PostProcessing(RunEnvironment): + + def __init__(self, setup): + super().__init__() + self.setup = setup + + +if __name__ == "__main__": + + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, + help="set experiment date as string") + parser_args = parser.parse_args() + # with run(): + # setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) + # PreProcessing(setup) diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py new file mode 100644 index 0000000000000000000000000000000000000000..aeed05fccab27a4787f23019f7eec391a6564297 --- /dev/null +++ b/src/modules/pre_processing.py @@ -0,0 +1,120 @@ +import logging +from typing import Any, Tuple, Dict, List + +from src.data_generator import DataGenerator +from src.helpers import TimeTracking +from src.modules.run_environment import RunEnvironment +from src.datastore import NameNotFoundInDataStore + + +DEFAULT_ARGS_LIST = ["data_path", "network", "stations", "variables", "interpolate_dim", "target_dim", "target_var"] +DEFAULT_KWARGS_LIST = ["limit_nan_fill", "window_history", "window_lead_time"] + + +class PreProcessing(RunEnvironment): + + """ + Pre-process your data by using this class. It includes time tracking and uses the experiment setup to look for data + and stores it if not already in local disk. Further, it provides this data as a generator and checks for valid + stations (in this context: valid=data available). Finally, it splits the data into valid training, validation and + testing subsets. + """ + + def __init__(self): + + # create run framework + super().__init__() + + # + self._run() + + def _create_args_dict(self, arg_list, scope="general"): + args = {} + for arg in arg_list: + try: + args[arg] = self.data_store.get(arg, scope) + except NameNotFoundInDataStore: + pass + return args + + def _run(self): + + args = self._create_args_dict(DEFAULT_ARGS_LIST) + kwargs = self._create_args_dict(DEFAULT_KWARGS_LIST) + valid_stations = self.check_valid_stations(args, kwargs, self.data_store.get("stations", "general")) + self.data_store.put("stations", valid_stations) + self.split_train_val_test() + + def split_train_val_test(self): + fraction_of_training = self.data_store.get("fraction_of_training", "general") + stations = self.data_store.get("stations", "general") + train_index, val_index, test_index = self.split_set_indices(len(stations), fraction_of_training) + for (ind, scope) in zip([train_index, val_index, test_index], ["train", "val", "test"]): + self.create_set_split(ind, scope) + # self.create_set_split(train_index, "train") + # self.create_set_split(val_index, "val") + # self.create_set_split(test_index, "test") + + @staticmethod + def split_set_indices(total_length: int, fraction: float) -> Tuple[slice, slice, slice]: + """ + create the training, validation and test subset slice indices for given total_length. The test data consists on + (1-fraction) of total_length (fraction*len:end). Train and validation data therefore are made from fraction of + total_length (0:fraction*len). Train and validation data is split by the factor 0.8 for train and 0.2 for + validation. + :param total_length: list with all objects to split + :param fraction: ratio between test and union of train/val data + :return: slices for each subset in the order: train, val, test + """ + pos_test_split = int(total_length * fraction) + train_index = slice(0, int(pos_test_split * 0.8)) + val_index = slice(int(pos_test_split * 0.8), pos_test_split) + test_index = slice(pos_test_split, total_length) + return train_index, val_index, test_index + + def create_set_split(self, index_list, set_name): + scope = f"general.{set_name}" + args = self._create_args_dict(DEFAULT_ARGS_LIST, scope) + kwargs = self._create_args_dict(DEFAULT_KWARGS_LIST, scope) + stations = args["stations"] + if args["use_all_stations_on_all_data_sets"]: + set_stations = stations + else: + set_stations = stations[index_list] + logging.debug(f"{set_name.capitalize()} stations (len={set_stations}): {set_stations}") + set_stations = self.check_valid_stations(args, kwargs, set_stations) + self.data_store.put("stations", set_stations, scope) + set_args = self._create_args_dict(DEFAULT_ARGS_LIST, scope) + data_set = DataGenerator(**set_args, **kwargs) + self.data_store.put("generator", data_set, scope) + + @staticmethod + def check_valid_stations(args: Dict, kwargs: Dict, all_stations: List[str]): + """ + Check if all given stations in `all_stations` are valid. Valid means, that there is data available for the given + time range (is included in `kwargs`). The shape and the loading time are logged in debug mode. + :param args: Dictionary with required parameters for DataGenerator class (`data_path`, `network`, `stations`, + `variables`, `interpolate_dim`, `target_dim`, `target_var`). + :param kwargs: positional parameters for the DataGenerator class (e.g. `start`, `interpolate_method`, + `window_lead_time`). + :param all_stations: All stations to check. + :return: Corrected list containing only valid station IDs. + """ + t_outer = TimeTracking() + t_inner = TimeTracking(start=False) + logging.info("check valid stations started") + valid_stations = [] + + # all required arguments of the DataGenerator can be found in args, positional arguments in args and kwargs + data_gen = DataGenerator(**args, **kwargs) + for station in all_stations: + t_inner.run() + try: + (history, label) = data_gen[station] + valid_stations.append(station) + logging.debug(f"{station}: history_shape = {history.shape}") + logging.debug(f"{station}: loading time = {t_inner}") + except AttributeError: + continue + logging.info(f"run for {t_outer} to check {len(all_stations)} station(s)") + return valid_stations diff --git a/src/modules/run_environment.py b/src/modules/run_environment.py new file mode 100644 index 0000000000000000000000000000000000000000..b0aa77d50fc4fbc10b3b9e4debfe2ae5173d2a22 --- /dev/null +++ b/src/modules/run_environment.py @@ -0,0 +1,45 @@ +__author__ = "Lukas Leufen" +__date__ = '2019-11-25' + +import logging +import time + +from src.datastore import DataStoreByScope as DataStoreObject +from src.helpers import TimeTracking + + +class RunEnvironment(object): + """ + basic run class to measure execution time. Either call this class calling it by 'with' or delete the class instance + after finishing the measurement. The duration result is logged. + """ + + del_by_exit = False + data_store = DataStoreObject() + + def __init__(self): + """ + Starts time tracking automatically and logs as info. + """ + self.time = TimeTracking() + logging.info(f"{self.__class__.__name__} started") + + def __del__(self): + """ + This is the class finalizer. The code is not executed if already called by exit method to prevent duplicated + logging (__exit__ is always executed before __del__) it this class was used in a with statement. + """ + if not self.del_by_exit: + self.time.stop() + logging.info(f"{self.__class__.__name__} finished after {self.time}") + self.del_by_exit = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__del__() + + @staticmethod + def do_stuff(length=2): + time.sleep(length) diff --git a/test/test_modules.py b/test/test_modules.py index 3211a8e855e8b342b42b1b86793df44fd061c179..b28b04f643122b019e912540f228c8ed20be9eeb 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -1,124 +1,3 @@ -import pytest -import logging -from src.modules import run, PreProcessing -from src.helpers import TimeTracking -import src.helpers -from src.experiment_setup import ExperimentSetup -from src.data_generator import DataGenerator -import re -import mock -import numpy as np -class pytest_regex: - """Assert that a given string meets some expectations.""" - def __init__(self, pattern, flags=0): - self._regex = re.compile(pattern, flags) - - def __eq__(self, actual): - return bool(self._regex.match(actual)) - - def __repr__(self): - return self._regex.pattern - - -class TestRun: - - def test_enter_exit(self, caplog): - caplog.set_level(logging.INFO) - with run() as r: - assert caplog.record_tuples[-1] == ('root', 20, 'run started') - assert isinstance(r.time, TimeTracking) - r.do_stuff(0.1) - assert caplog.record_tuples[-1] == ('root', 20, pytest_regex(r"run finished after \d+\.\d+s")) - - def test_init_del(self, caplog): - caplog.set_level(logging.INFO) - r = run() - assert caplog.record_tuples[-1] == ('root', 20, 'run started') - r.do_stuff(0.2) - del r - assert caplog.record_tuples[-1] == ('root', 20, pytest_regex(r"run finished after \d+\.\d+s")) - - -class TestPreProcessing: - - def test_init(self, caplog): - caplog.set_level(logging.INFO) - setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], - var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) - pre = PreProcessing(setup) - assert caplog.record_tuples[0] == ('root', 20, 'PreProcessing started') - assert caplog.record_tuples[1] == ('root', 20, 'check valid stations started') - assert caplog.record_tuples[-1] == ('root', 20, pytest_regex(r'run for \d+\.\d+s to check 5 station\(s\)')) - - def test_run(self): - pre_processing = object.__new__(PreProcessing) - pre_processing.time = TimeTracking() - pre_processing.setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], - var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) - assert pre_processing._run() is None - - def test_split_train_val_test(self): - pass - - def test_check_valid_stations(self, caplog): - caplog.set_level(logging.INFO) - pre = object.__new__(PreProcessing) - pre.time = TimeTracking() - pre.setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], - var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) - kwargs = {'start': '1997-01-01', 'end': '2017-12-31', 'limit_nan_fill': 1, 'window_history': 13, - 'window_lead_time': 3, 'interpolate_method': 'linear', - 'statistics_per_var': pre.setup.var_all_dict, } - valids = pre.check_valid_stations(pre.setup.__dict__, kwargs, pre.setup.stations) - assert valids == pre.setup.stations - assert caplog.record_tuples[0] == ('root', 20, 'check valid stations started') - assert caplog.record_tuples[1] == ('root', 20, pytest_regex(r'run for \d+\.\d+s to check 5 station\(s\)')) - - def test_update_kwargs(self): - args = {"testName": {"testAttribute": "TestValue", "optional": "2019-11-21"}} - kwargs = {"testAttribute": "DefaultValue", "defaultAttribute": 3} - updated = PreProcessing.update_kwargs(args, kwargs, "testName") - assert updated == {"testAttribute": "TestValue", "defaultAttribute": 3, "optional": "2019-11-21"} - assert kwargs == {"testAttribute": "DefaultValue", "defaultAttribute": 3} - args = {"testName": None} - updated = PreProcessing.update_kwargs(args, kwargs, "testName") - assert updated == {"testAttribute": "DefaultValue", "defaultAttribute": 3} - args = {"dummy": "notMeaningful"} - updated = PreProcessing.update_kwargs(args, kwargs, "testName") - assert updated == {"testAttribute": "DefaultValue", "defaultAttribute": 3} - - def test_update_key(self): - orig_dict = {"Test1": 3, "Test2": "4", "test3": [1, 2, 3]} - f = PreProcessing.update_key - assert f(orig_dict, "Test2", 4) == {"Test1": 3, "Test2": 4, "test3": [1, 2, 3]} - assert orig_dict == {"Test1": 3, "Test2": "4", "test3": [1, 2, 3]} - assert f(orig_dict, "Test3", 4) == {"Test1": 3, "Test2": "4", "test3": [1, 2, 3], "Test3": 4} - - def test_split_set_indices(self): - dummy_list = list(range(0, 15)) - train, val, test = PreProcessing.split_set_indices(len(dummy_list), 0.9) - assert dummy_list[train] == list(range(0, 10)) - assert dummy_list[val] == list(range(10, 13)) - assert dummy_list[test] == list(range(13, 15)) - - @mock.patch("DataGenerator", return_value=object.__new__(DataGenerator)) - @mock.patch("DataGenerator[station]", return_value=(np.ones(10), np.zeros(10))) - def test_create_set_split(self): - stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] - pre = object.__new__(PreProcessing) - pre.setup = ExperimentSetup({}, stations=stations, var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}, - train_kwargs={"start": "2000-01-01", "end": "2007-12-31"}) - kwargs = {'start': '1997-01-01', 'end': '2017-12-31', 'statistics_per_var': pre.setup.var_all_dict, } - train = pre.create_set_split(stations, pre.setup.__dict__, kwargs, slice(0, 3), "train") - # stopped here. It is a mess with all the different kwargs, args etc. Restructure the idea of how to implement - # the data sets. Because there are multiple kwargs declarations and which counts in the end. And there are - # multiple declarations of the DataGenerator class. Why this? Is it somehow possible, to select elements from - # this iterator class. Furthermore the names of the DataPrep class is not distinct, because there is no time - # range provided in file's name. Given the case, that first to total DataGen is called with a short period for - # data loading. But then, for the data split (I don't know why this could happen, but it is very likely because - # osf the current multiple declarations of kwargs arguments) the desired time range exceeds the previou - # mentioned and short time range. But nevertheless, the file with the short period is loaded and used (because - # during DataPrep loading, the available range is checked). diff --git a/test/test_modules/test_experiment_setup.py b/test/test_modules/test_experiment_setup.py new file mode 100644 index 0000000000000000000000000000000000000000..832ff45a0a3b1384e1300c0fa38ed3d1ec2204b8 --- /dev/null +++ b/test/test_modules/test_experiment_setup.py @@ -0,0 +1,147 @@ +import pytest +import logging +import argparse +import os + +from src.modules.experiment_setup import ExperimentSetup +from src.helpers import TimeTracking, prepare_host +from src.datastore import NameNotFoundInScope, NameNotFoundInDataStore + + +class TestExperimentSetup: + + @pytest.fixture + def empty_obj(self, caplog): + obj = object.__new__(ExperimentSetup) + obj.time = TimeTracking() + caplog.set_level(logging.DEBUG) + return obj + + def test_set_param_by_value(self, caplog, empty_obj): + empty_obj._set_param("23tester", 23) + assert caplog.record_tuples[-1] == ('root', 10, 'set experiment attribute: 23tester(general)=23') + assert empty_obj.data_store.get("23tester", "general") == 23 + + def test_set_param_by_value_and_scope(self, caplog, empty_obj): + empty_obj._set_param("109tester", 109, "general.testing") + assert empty_obj.data_store.get("109tester", "general.tester") == 109 + + def test_set_param_with_default(self, caplog, empty_obj): + empty_obj._set_param("NoneTester", None, "notNone", "general.testing") + assert empty_obj.data_store.get("NoneTester", "general.testing") == "notNone" + empty_obj._set_param("AnotherNoneTester", None) + assert empty_obj.data_store.get("AnotherNoneTester", "general") is None + + def test_get_parser_args_from_dict(self, empty_obj): + res = empty_obj._get_parser_args({'test2': 2, 'test10str': "10"}) + assert res == {'test2': 2, 'test10str': "10"} + + def test_get_parser_args_from_parse_args(self, empty_obj): + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', type=str) + parser_args = parser.parse_args(["--experiment_date", "TOMORROW"]) + assert empty_obj._get_parser_args(parser_args) == {"experiment_date": "TOMORROW"} + + def test_init_default(self): + exp_setup = ExperimentSetup() + data_store = exp_setup.data_store + # experiment setup + assert data_store.get("data_path", "general") == prepare_host() + assert data_store.get("trainable", "general") is False + assert data_store.get("fraction_of_train", "general") == 0.8 + # set experiment name + assert data_store.get("experiment_name", "general") == "TestExperiment" + path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "TestExperiment")) + assert data_store.get("experiment_path", "general") == path + default_var_all_dict = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + # setup for data + assert data_store.get("var_all_dict", "general") == default_var_all_dict + default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', + 'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', + 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', + 'DEBW042', 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', + 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', + 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', 'DEBY005', 'DEBW046', 'DEBW103', + 'DEBW052', 'DEBW034', 'DEBY088', ] + assert data_store.get("stations", "general") == default_stations + assert data_store.get("network", "general") == "AIRBASE" + assert data_store.get("variables", "general") == list(default_var_all_dict.keys()) + assert data_store.get("statistics_per_var", "general") == default_var_all_dict + assert data_store.get("start", "general") == "1997-01-01" + assert data_store.get("end", "general") == "2017-12-31" + assert data_store.get("window_history", "general") == 13 + # target + assert data_store.get("target_var", "general") == "o3" + assert data_store.get("target_dim", "general") == "variables" + assert data_store.get("window_lead_time", "general") == 3 + # interpolation + assert data_store.get("dimensions", "general") == {'new_index': ['datetime', 'Stations']} + assert data_store.get("interpolate_dim", "general") == "datetime" + assert data_store.get("interpolate_method", "general") == "linear" + assert data_store.get("limit_nan_fill", "general") == 1 + # train parameters + assert data_store.get("start", "general.train") == "1997-01-01" + assert data_store.get("end", "general.train") == "2007-12-31" + # validation parameters + assert data_store.get("start", "general.val") == "2008-01-01" + assert data_store.get("end", "general.val") == "2009-12-31" + # test parameters + assert data_store.get("start", "general.test") == "2010-01-01" + assert data_store.get("end", "general.test") == "2017-12-31" + # use all stations on all data sets (train, val, test) + assert data_store.get("use_all_stations_on_all_data_sets", "general") is True + + def test_init_no_default(self): + experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) + kwargs = dict(parser_args={"experiment_date": "TODAY"}, + var_all_dict={'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum'}, + stations=['DEBY053', 'DEBW059', 'DEBW027'], network="INTERNET", variables=["o3", "temp"], + statistics_per_var=None, start="1999-01-01", end="2001-01-01", window_history=4, + target_var="temp", target_dim="target", window_lead_time=10, dimensions="dim1", + interpolate_dim="int_dim", interpolate_method="cubic", limit_nan_fill=5, train_start="2000-01-01", + train_end="2000-01-02", val_start="2000-01-03", val_end="2000-01-04", test_start="2000-01-05", + test_end="2000-01-06", use_all_stations_on_all_data_sets=False, trainable=True, + fraction_of_train=0.5, experiment_path=experiment_path) + exp_setup = ExperimentSetup(**kwargs) + data_store = exp_setup.data_store + # experiment setup + assert data_store.get("data_path", "general") == prepare_host() + assert data_store.get("trainable", "general") is True + assert data_store.get("fraction_of_train", "general") == 0.5 + # set experiment name + assert data_store.get("experiment_name", "general") == "TODAY_network/" + path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) + assert data_store.get("experiment_path", "general") == path + # setup for data + assert data_store.get("var_all_dict", "general") == {'o3': 'dma8eu', 'relhum': 'average_values', + 'temp': 'maximum'} + assert data_store.get("stations", "general") == ['DEBY053', 'DEBW059', 'DEBW027'] + assert data_store.get("network", "general") == "INTERNET" + assert data_store.get("variables", "general") == ["o3", "temp"] + assert data_store.get("statistics_per_var", "general") == {'o3': 'dma8eu', 'relhum': 'average_values', + 'temp': 'maximum'} + assert data_store.get("start", "general") == "1999-01-01" + assert data_store.get("end", "general") == "2001-01-01" + assert data_store.get("window_history", "general") == 4 + # target + assert data_store.get("target_var", "general") == "temp" + assert data_store.get("target_dim", "general") == "target" + assert data_store.get("window_lead_time", "general") == 10 + # interpolation + assert data_store.get("dimensions", "general") == "dim1" + assert data_store.get("interpolate_dim", "general") == "int_dim" + assert data_store.get("interpolate_method", "general") == "cubic" + assert data_store.get("limit_nan_fill", "general") == 5 + # train parameters + assert data_store.get("start", "general.train") == "2000-01-01" + assert data_store.get("end", "general.train") == "2000-01-02" + # validation parameters + assert data_store.get("start", "general.val") == "2000-01-03" + assert data_store.get("end", "general.val") == "2000-01-04" + # test parameters + assert data_store.get("start", "general.test") == "2000-01-05" + assert data_store.get("end", "general.test") == "2000-01-06" + # use all stations on all data sets (train, val, test) + assert data_store.get("use_all_stations_on_all_data_sets", "general.test") is False diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py new file mode 100644 index 0000000000000000000000000000000000000000..a1a1aa454fc788aabe6280d618009834dc9f26bf --- /dev/null +++ b/test/test_modules/test_pre_processing.py @@ -0,0 +1,87 @@ +import logging + +from src.helpers import PyTestRegex, TimeTracking +from src.modules.experiment_setup import ExperimentSetup +from src.modules.pre_processing import PreProcessing + + +class TestPreProcessing: + + def test_init(self, caplog): + caplog.set_level(logging.INFO) + setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], + var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) + pre = PreProcessing(setup) + assert caplog.record_tuples[0] == ('root', 20, 'PreProcessing started') + assert caplog.record_tuples[1] == ('root', 20, 'check valid stations started') + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 5 station\(s\)')) + + def test_run(self): + pre_processing = object.__new__(PreProcessing) + pre_processing.time = TimeTracking() + pre_processing.setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], + var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) + assert pre_processing._run() is None + + def test_split_train_val_test(self): + pass + + def test_check_valid_stations(self, caplog): + caplog.set_level(logging.INFO) + pre = object.__new__(PreProcessing) + pre.time = TimeTracking() + pre.setup = ExperimentSetup({}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], + var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) + kwargs = {'start': '1997-01-01', 'end': '2017-12-31', 'limit_nan_fill': 1, 'window_history': 13, + 'window_lead_time': 3, 'interpolate_method': 'linear', + 'statistics_per_var': {'o3': 'dma8eu', 'temp': 'maximum'} } + valids = pre.check_valid_stations(pre.setup.__dict__, kwargs, pre.setup.stations) + assert valids == pre.setup.stations + assert caplog.record_tuples[0] == ('root', 20, 'check valid stations started') + assert caplog.record_tuples[1] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 5 station\(s\)')) + + def test_update_kwargs(self): + args = {"testName": {"testAttribute": "TestValue", "optional": "2019-11-21"}} + kwargs = {"testAttribute": "DefaultValue", "defaultAttribute": 3} + updated = PreProcessing.update_kwargs(args, kwargs, "testName") + assert updated == {"testAttribute": "TestValue", "defaultAttribute": 3, "optional": "2019-11-21"} + assert kwargs == {"testAttribute": "DefaultValue", "defaultAttribute": 3} + args = {"testName": None} + updated = PreProcessing.update_kwargs(args, kwargs, "testName") + assert updated == {"testAttribute": "DefaultValue", "defaultAttribute": 3} + args = {"dummy": "notMeaningful"} + updated = PreProcessing.update_kwargs(args, kwargs, "testName") + assert updated == {"testAttribute": "DefaultValue", "defaultAttribute": 3} + + def test_update_key(self): + orig_dict = {"Test1": 3, "Test2": "4", "test3": [1, 2, 3]} + f = PreProcessing.update_key + assert f(orig_dict, "Test2", 4) == {"Test1": 3, "Test2": 4, "test3": [1, 2, 3]} + assert orig_dict == {"Test1": 3, "Test2": "4", "test3": [1, 2, 3]} + assert f(orig_dict, "Test3", 4) == {"Test1": 3, "Test2": "4", "test3": [1, 2, 3], "Test3": 4} + + def test_split_set_indices(self): + dummy_list = list(range(0, 15)) + train, val, test = PreProcessing.split_set_indices(len(dummy_list), 0.9) + assert dummy_list[train] == list(range(0, 10)) + assert dummy_list[val] == list(range(10, 13)) + assert dummy_list[test] == list(range(13, 15)) + + # @mock.patch("DataGenerator", return_value=object.__new__(DataGenerator)) + # @mock.patch("DataGenerator[station]", return_value=(np.ones(10), np.zeros(10))) + # def test_create_set_split(self): + # stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] + # pre = object.__new__(PreProcessing) + # pre.setup = ExperimentSetup({}, stations=stations, var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}, + # train_kwargs={"start": "2000-01-01", "end": "2007-12-31"}) + # kwargs = {'start': '1997-01-01', 'end': '2017-12-31', 'statistics_per_var': pre.setup.var_all_dict, } + # train = pre.create_set_split(stations, pre.setup.__dict__, kwargs, slice(0, 3), "train") + # # stopped here. It is a mess with all the different kwargs, args etc. Restructure the idea of how to implement + # # the data sets. Because there are multiple kwargs declarations and which counts in the end. And there are + # # multiple declarations of the DataGenerator class. Why this? Is it somehow possible, to select elements from + # # this iterator class. Furthermore the names of the DataPrep class is not distinct, because there is no time + # # range provided in file's name. Given the case, that first to total DataGen is called with a short period for + # # data loading. But then, for the data split (I don't know why this could happen, but it is very likely because + # # osf the current multiple declarations of kwargs arguments) the desired time range exceeds the previou + # # mentioned and short time range. But nevertheless, the file with the short period is loaded and used (because + # # during DataPrep loading, the available range is checked). \ No newline at end of file diff --git a/test/test_modules/test_run_environment.py b/test/test_modules/test_run_environment.py new file mode 100644 index 0000000000000000000000000000000000000000..ce5f995e54df1ffc93c80c191376347c5f0b3741 --- /dev/null +++ b/test/test_modules/test_run_environment.py @@ -0,0 +1,31 @@ +import logging + +from src.helpers import TimeTracking, PyTestRegex +from src.modules.run_environment import RunEnvironment + + +class TestRunEnvironment: + + def test_enter(self, caplog): + caplog.set_level(logging.INFO) + with RunEnvironment() as r: + assert caplog.record_tuples[-1] == ('root', 20, 'RunEnvironment started') + assert isinstance(r.time, TimeTracking) + + def test_exit(self, caplog): + caplog.set_level(logging.INFO) + with RunEnvironment() as r: + r.do_stuff(0.1) + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s")) + + def test_init(self, caplog): + caplog.set_level(logging.INFO) + r = RunEnvironment() + assert caplog.record_tuples[-1] == ('root', 20, 'RunEnvironment started') + + def test_del(self, caplog): + caplog.set_level(logging.INFO) + r = RunEnvironment() + r.do_stuff(0.2) + del r + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s"))