diff --git a/run.py b/run.py index 6b5c367dae776d7aa1c5f26d70b6b952b64e064d..1579ae35f0d270dc0d2529cf1b6d36bc410e317a 100644 --- a/run.py +++ b/run.py @@ -3,145 +3,33 @@ __date__ = '2019-11-14' import logging -from src.helpers import TimeTracking -from src import helpers import argparse -import time +from src.modules.experiment_setup import ExperimentSetup +from src.modules import run, PreProcessing, Training, PostProcessing -formatter = "%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]" -logging.basicConfig(level=logging.INFO, format=formatter) +def main(): + with run(): + exp_setup = ExperimentSetup(args, trainable=True, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) -class run(object): - """ - basic run class to measure execution time. Either call this class calling it by 'with' or delete the class instance - after finishing the measurement. The duration result is logged. - """ - - def __init__(self): - self.time = TimeTracking() - logging.info(f"{self.__class__.__name__} started") - - def __del__(self): - self.time.stop() - logging.info(f"{self.__class__.__name__} finished after {self.time}") - - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def do_stuff(self): - time.sleep(2) - - -class ExperimentSetup: - """ - params: - trainable: Train new model if true, otherwise try to load existing model - """ - - def __init__(self, **kwargs): - self.data_path = None - self.experiment_path = None - self.experiment_name = None - self.trainable = None - self.fraction_of_train = None - self.use_all_stations_on_all_data_sets = None - self.network = None - self.var_all_dict = None - self.all_stations = None - self.variables = None - self.dimensions = None - self.dim = None - self.target_dim = None - self.target_var = None - self.setup_experiment(**kwargs) - - def _set_param(self, param, value, default=None): - if default is not None: - value = value.get(param, default) - setattr(self, param, value) - logging.info(f"set experiment attribute: {param}={value}") - - def setup_experiment(self, **kwargs): - - # set data path of this experiment - self._set_param("data_path", helpers.prepare_host()) - - # set experiment name - exp_date = args.experiment_date - exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date) - self._set_param("experiment_name", exp_name) - self._set_param("experiment_path", exp_path) - helpers.check_path_and_create(self.experiment_path) - - # set if model is trainable - self._set_param("trainable", kwargs, default=True) - - # set fraction of train - self._set_param("fraction_of_train", kwargs, default=0.8) - - # use all stations on all data sets (train, val, test) - self._set_param("use_all_stations_on_all_data_sets", kwargs, default=True) - self._set_param("network", kwargs, default="AIRBASE") - self._set_param("var_all_dict", kwargs, default={'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', - 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', - 'no2': 'dma8eu', 'cloudcover': 'average_values', - 'pblheight': 'maximum'}) - self._set_param("all_stations", kwargs, default=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', - 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', 'DEBY020', - 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', - 'DEBY039', 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', - 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', - 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', - 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', - 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', - 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', - 'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', - 'DEBY088', ]) - self._set_param("variables", kwargs, default=list(self.var_all_dict.keys())) - self._set_param("dimensions", kwargs, default={'new_index': ['datetime', 'Stations']}) - self._set_param("dim", kwargs, default='datetime') - self._set_param("target_dim", kwargs, default='variables') - self._set_param("target_var", kwargs, default="o3") - - -class PreProcessing(run): - - def __init__(self, setup): - super().__init__() - self.setup = setup - - -class Training(run): - - def __init__(self, setup): - super().__init__() - self.setup = setup - + PreProcessing(exp_setup) -class PostProcessing(run): + Training(exp_setup) - def __init__(self, setup): - super().__init__() - self.setup = setup + PostProcessing(exp_setup) if __name__ == "__main__": + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.INFO) + parser = argparse.ArgumentParser() parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, help="set experiment date as string") args = parser.parse_args() - with run(): - exp_setup = ExperimentSetup(trainable=True) - - PreProcessing(exp_setup) - - Training(exp_setup) - - PostProcessing(exp_setup) + experiment = ExperimentSetup(args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) + a = 1 + # main() diff --git a/src/data_generator.py b/src/data_generator.py index d067e1e9e0d3225e869ca6a2944b7c749198834b..3d8a1c7c242da3d45a1b17361e210a016a419dd6 100644 --- a/src/data_generator.py +++ b/src/data_generator.py @@ -6,8 +6,6 @@ from src import helpers from src.data_preparation import DataPrep import os from typing import Union, List, Tuple -import decimal -import numpy as np import xarray as xr @@ -20,11 +18,11 @@ class DataGenerator(keras.utils.Sequence): one entry of integer or string """ - def __init__(self, path: str, network: str, stations: Union[str, List[str]], variables: List[str], + def __init__(self, data_path: str, network: str, stations: Union[str, List[str]], variables: List[str], interpolate_dim: str, target_dim: str, target_var: str, interpolate_method: str = "linear", limit_nan_fill: int = 1, window_history: int = 7, window_lead_time: int = 4, transform_method: str = "standardise", **kwargs): - self.path = os.path.abspath(path) + self.data_path = os.path.abspath(data_path) self.network = network self.stations = helpers.to_list(stations) self.variables = variables @@ -42,7 +40,7 @@ class DataGenerator(keras.utils.Sequence): """ display all class attributes """ - return f"DataGenerator(path='{self.path}', network='{self.network}', stations={self.stations}, " \ + return f"DataGenerator(path='{self.data_path}', network='{self.network}', stations={self.stations}, " \ f"variables={self.variables}, interpolate_dim='{self.interpolate_dim}', target_dim='{self.target_dim}'" \ f", target_var='{self.target_var}', **{self.kwargs})" @@ -96,7 +94,7 @@ class DataGenerator(keras.utils.Sequence): :return: preprocessed data as a DataPrep instance """ station = self.get_station_key(key) - data = DataPrep(self.path, self.network, station, self.variables, **self.kwargs) + data = DataPrep(self.data_path, self.network, station, self.variables, **self.kwargs) data.interpolate(self.interpolate_dim, method=self.interpolate_method, limit=self.limit_nan_fill) data.transform("datetime", method=self.transform_method) data.make_history_window(self.interpolate_dim, self.window_history) diff --git a/src/datastore.py b/src/datastore.py index 348f28549e716a30605ff2be5894fd5b03b4dc95..bb8474a04b503b1ff50fcea1b7e5f8bbd1d9ebea 100644 --- a/src/datastore.py +++ b/src/datastore.py @@ -3,7 +3,6 @@ __date__ = '2019-11-22' from typing import Any, List, Tuple - from abc import ABC @@ -80,6 +79,16 @@ class AbstractDataStore(ABC): """ pass + def list_all_names(self) -> None: + """ + List all names available in the data store. + :return: all names + """ + pass + + def clear_data_store(self) -> None: + self._store = {} + class DataStoreByVariable(AbstractDataStore): @@ -327,5 +336,3 @@ class DataStoreByScope(AbstractDataStore): if name not in names: names.append(name) return sorted(names) - - diff --git a/src/helpers.py b/src/helpers.py index 4fd5cc2b965d38a090a53dfc63e6f2d7aa582c3f..1b635a2e9cd00bfbb3650a5a7a8768378cfcdb6b 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,3 +1,5 @@ +import re + __author__ = 'Lukas Leufen' __date__ = '2019-10-21' @@ -11,7 +13,6 @@ import numpy as np import os import time import socket -import sys def to_list(arg): @@ -23,9 +24,9 @@ def to_list(arg): def check_path_and_create(path): try: os.makedirs(path) - logging.info(f"Created path: {path}") + logging.debug(f"Created path: {path}") except FileExistsError: - logging.info(f"Path already exists: {path}") + logging.debug(f"Path already exists: {path}") def l_p_loss(power: int): @@ -134,27 +135,39 @@ class TimeTracking(object): return self._duration() -def prepare_host(): +def prepare_host(create_new=True): hostname = socket.gethostname() - user = os.getlogin() + try: + user = os.getlogin() + except OSError: + user = "default" if hostname == 'ZAM144': path = f'/home/{user}/Data/toar_daily/' elif hostname == 'zam347': path = f'/home/{user}/Data/toar_daily/' elif hostname == 'linux-gzsx': - path = f'/home/{user}/machinelearningtools' + path = f'/home/{user}/machinelearningtools/data/toar_daily/' elif (len(hostname) > 2) and (hostname[:2] == 'jr'): path = f'/p/project/cjjsc42/{user}/DATA/toar_daily/' elif (len(hostname) > 2) and (hostname[:2] == 'jw'): path = f'/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_daily/' + elif "runner-6HmDp9Qd-project-2411-concurrent" in hostname: + path = f'/home/{user}/machinelearningtools/data/toar_daily/' else: logging.error(f"unknown host '{hostname}'") raise OSError(f"unknown host '{hostname}'") if not os.path.exists(path): - logging.error(f"path '{path}' does not exist for host '{hostname}'.") - raise NotADirectoryError(f"path '{path}' does not exist for host '{hostname}'.") + try: + if create_new: + check_path_and_create(path) + return path + else: + raise PermissionError + except PermissionError: + logging.error(f"path '{path}' does not exist for host '{hostname}'.") + raise NotADirectoryError(f"path '{path}' does not exist for host '{hostname}'.") else: - logging.info(f"set path to: {path}") + logging.debug(f"set path to: {path}") return path @@ -169,3 +182,16 @@ def set_experiment_name(experiment_date=None, experiment_path=None): else: experiment_path = os.path.abspath(experiment_path) return experiment_name, experiment_path + + +class PyTestRegex: + """Assert that a given string meets some expectations.""" + + def __init__(self, pattern: str, flags: int = 0): + self._regex = re.compile(pattern, flags) + + def __eq__(self, actual: str) -> bool: + return bool(self._regex.match(actual)) + + def __repr__(self) -> str: + return self._regex.pattern diff --git a/src/join.py b/src/join.py index a8b8edc7d25a610db3dbbd623cf2cde162587eaa..2b13dcf41c5bc03e9dba274fd5e643c79b091cde 100644 --- a/src/join.py +++ b/src/join.py @@ -11,7 +11,6 @@ from typing import Iterator, Union, List from src import helpers join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' -logging.basicConfig(level=logging.INFO) def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]: diff --git a/src/modules/__init__.py b/src/modules/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/modules/experiment_setup.py b/src/modules/experiment_setup.py new file mode 100644 index 0000000000000000000000000000000000000000..a76fe60b34b679b5702ec85a11f95002c3c6fe34 --- /dev/null +++ b/src/modules/experiment_setup.py @@ -0,0 +1,119 @@ +__author__ = "Lukas Leufen" +__date__ = '2019-11-15' + + +import logging +import argparse +from typing import Union, Dict, Any + +from src import helpers +from src.modules.run_environment import RunEnvironment + + +DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', + 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081', + 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001', + 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', + 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', + 'DEBY005', 'DEBW046', 'DEBW103', 'DEBW052', 'DEBW034', 'DEBY088', ] +DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + + +class ExperimentSetup(RunEnvironment): + """ + params: + trainable: Train new model if true, otherwise try to load existing model + """ + + def __init__(self, parser_args=None, var_all_dict=None, stations=None, network=None, variables=None, + statistics_per_var=None, start=None, end=None, window_history=None, target_var="o3", target_dim=None, + window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, + limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, + test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None, + experiment_path=None): + + # create run framework + super().__init__() + + # experiment setup + self._set_param("data_path", helpers.prepare_host()) + self._set_param("trainable", trainable, default=False) + self._set_param("fraction_of_training", fraction_of_train, default=0.8) + + # set experiment name + exp_date = self._get_parser_args(parser_args).get("experiment_date") + exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date, experiment_path=experiment_path) + self._set_param("experiment_name", exp_name) + self._set_param("experiment_path", exp_path) + helpers.check_path_and_create(self.data_store.get("experiment_path", "general")) + + # setup for data + self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT) + self._set_param("stations", stations, default=DEFAULT_STATIONS) + self._set_param("network", network, default="AIRBASE") + self._set_param("variables", variables, default=list(self.data_store.get("var_all_dict", "general").keys())) + self._set_param("statistics_per_var", statistics_per_var, default=self.data_store.get("var_all_dict", "general")) + self._set_param("start", start, default="1997-01-01", scope="general") + self._set_param("end", end, default="2017-12-31", scope="general") + self._set_param("window_history", window_history, default=13) + + # target + self._set_param("target_var", target_var, default="o3") + self._set_param("target_dim", target_dim, default='variables') + self._set_param("window_lead_time", window_lead_time, default=3) + + # interpolation + self._set_param("dimensions", dimensions, default={'new_index': ['datetime', 'Stations']}) + self._set_param("interpolate_dim", interpolate_dim, default='datetime') + self._set_param("interpolate_method", interpolate_method, default='linear') + self._set_param("limit_nan_fill", limit_nan_fill, default=1) + + # train parameters + self._set_param("start", train_start, default="1997-01-01", scope="general.train") + self._set_param("end", train_end, default="2007-12-31", scope="general.train") + + # validation parameters + self._set_param("start", val_start, default="2008-01-01", scope="general.val") + self._set_param("end", val_end, default="2009-12-31", scope="general.val") + + # test parameters + self._set_param("start", test_start, default="2010-01-01", scope="general.test") + self._set_param("end", test_end, default="2017-12-31", scope="general.test") + + # use all stations on all data sets (train, val, test) + self._set_param("use_all_stations_on_all_data_sets", use_all_stations_on_all_data_sets, default=True) + + def _set_param(self, param: str, value: Any, default: Any = None, scope: str = "general") -> None: + if value is None and default is not None: + value = default + self.data_store.put(param, value, scope) + logging.debug(f"set experiment attribute: {param}({scope})={value}") + + @staticmethod + def _get_parser_args(args: Union[Dict, argparse.Namespace]) -> Dict: + """ + Transform args to dict if given as argparse.Namespace + :param args: either a dictionary or an argument parser instance + :return: dictionary with all arguments + """ + if isinstance(args, argparse.Namespace): + return args.__dict__ + elif isinstance(args, dict): + return args + else: + return {} + + +if __name__ == "__main__": + + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, + help="set experiment date as string") + parser_args = parser.parse_args() + with RunEnvironment(): + setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) diff --git a/src/modules/modules.py b/src/modules/modules.py new file mode 100644 index 0000000000000000000000000000000000000000..8532e1d812a5de7a3b47423d9f9bb3c9bcd43abc --- /dev/null +++ b/src/modules/modules.py @@ -0,0 +1,33 @@ +import logging +# from src.experiment_setup import ExperimentSetup +import argparse + +from src.modules.run_environment import RunEnvironment + + +class Training(RunEnvironment): + + def __init__(self, setup): + super().__init__() + self.setup = setup + + +class PostProcessing(RunEnvironment): + + def __init__(self, setup): + super().__init__() + self.setup = setup + + +if __name__ == "__main__": + + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, + help="set experiment date as string") + parser_args = parser.parse_args() + # with run(): + # setup = ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']) + # PreProcessing(setup) diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py new file mode 100644 index 0000000000000000000000000000000000000000..d999217e9f903d3d67a24179c9f3654fee3e60d4 --- /dev/null +++ b/src/modules/pre_processing.py @@ -0,0 +1,116 @@ +import logging +from typing import Any, Tuple, Dict, List + +from src.data_generator import DataGenerator +from src.helpers import TimeTracking +from src.modules.run_environment import RunEnvironment +from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope + + +DEFAULT_ARGS_LIST = ["data_path", "network", "stations", "variables", "interpolate_dim", "target_dim", "target_var"] +DEFAULT_KWARGS_LIST = ["limit_nan_fill", "window_history", "window_lead_time", "statistics_per_var"] + + +class PreProcessing(RunEnvironment): + + """ + Pre-process your data by using this class. It includes time tracking and uses the experiment setup to look for data + and stores it if not already in local disk. Further, it provides this data as a generator and checks for valid + stations (in this context: valid=data available). Finally, it splits the data into valid training, validation and + testing subsets. + """ + + def __init__(self): + + # create run framework + super().__init__() + + # + self._run() + + def _create_args_dict(self, arg_list, scope="general"): + args = {} + for arg in arg_list: + try: + args[arg] = self.data_store.get(arg, scope) + except (NameNotFoundInDataStore, NameNotFoundInScope): + pass + return args + + def _run(self): + args = self._create_args_dict(DEFAULT_ARGS_LIST) + kwargs = self._create_args_dict(DEFAULT_KWARGS_LIST) + valid_stations = self.check_valid_stations(args, kwargs, self.data_store.get("stations", "general")) + self.data_store.put("stations", valid_stations, "general") + self.split_train_val_test() + + def split_train_val_test(self): + fraction_of_training = self.data_store.get("fraction_of_training", "general") + stations = self.data_store.get("stations", "general") + train_index, val_index, test_index = self.split_set_indices(len(stations), fraction_of_training) + for (ind, scope) in zip([train_index, val_index, test_index], ["train", "val", "test"]): + self.create_set_split(ind, scope) + + @staticmethod + def split_set_indices(total_length: int, fraction: float) -> Tuple[slice, slice, slice]: + """ + create the training, validation and test subset slice indices for given total_length. The test data consists on + (1-fraction) of total_length (fraction*len:end). Train and validation data therefore are made from fraction of + total_length (0:fraction*len). Train and validation data is split by the factor 0.8 for train and 0.2 for + validation. + :param total_length: list with all objects to split + :param fraction: ratio between test and union of train/val data + :return: slices for each subset in the order: train, val, test + """ + pos_test_split = int(total_length * fraction) + train_index = slice(0, int(pos_test_split * 0.8)) + val_index = slice(int(pos_test_split * 0.8), pos_test_split) + test_index = slice(pos_test_split, total_length) + return train_index, val_index, test_index + + def create_set_split(self, index_list, set_name): + scope = f"general.{set_name}" + args = self._create_args_dict(DEFAULT_ARGS_LIST, scope) + kwargs = self._create_args_dict(DEFAULT_KWARGS_LIST, scope) + stations = args["stations"] + if self.data_store.get("use_all_stations_on_all_data_sets", scope): + set_stations = stations + else: + set_stations = stations[index_list] + logging.debug(f"{set_name.capitalize()} stations (len={len(set_stations)}): {set_stations}") + set_stations = self.check_valid_stations(args, kwargs, set_stations) + self.data_store.put("stations", set_stations, scope) + set_args = self._create_args_dict(DEFAULT_ARGS_LIST, scope) + data_set = DataGenerator(**set_args, **kwargs) + self.data_store.put("generator", data_set, scope) + + @staticmethod + def check_valid_stations(args: Dict, kwargs: Dict, all_stations: List[str]): + """ + Check if all given stations in `all_stations` are valid. Valid means, that there is data available for the given + time range (is included in `kwargs`). The shape and the loading time are logged in debug mode. + :param args: Dictionary with required parameters for DataGenerator class (`data_path`, `network`, `stations`, + `variables`, `interpolate_dim`, `target_dim`, `target_var`). + :param kwargs: positional parameters for the DataGenerator class (e.g. `start`, `interpolate_method`, + `window_lead_time`). + :param all_stations: All stations to check. + :return: Corrected list containing only valid station IDs. + """ + t_outer = TimeTracking() + t_inner = TimeTracking(start=False) + logging.info("check valid stations started") + valid_stations = [] + + # all required arguments of the DataGenerator can be found in args, positional arguments in args and kwargs + data_gen = DataGenerator(**args, **kwargs) + for station in all_stations: + t_inner.run() + try: + (history, label) = data_gen[station] + valid_stations.append(station) + logging.debug(f"{station}: history_shape = {history.shape}") + logging.debug(f"{station}: loading time = {t_inner}") + except AttributeError: + continue + logging.info(f"run for {t_outer} to check {len(all_stations)} station(s)") + return valid_stations diff --git a/src/modules/run_environment.py b/src/modules/run_environment.py new file mode 100644 index 0000000000000000000000000000000000000000..56c017290eea4d11881b9b131378d8c5995f0b29 --- /dev/null +++ b/src/modules/run_environment.py @@ -0,0 +1,47 @@ +__author__ = "Lukas Leufen" +__date__ = '2019-11-25' + +import logging +import time + +from src.datastore import DataStoreByScope as DataStoreObject +from src.helpers import TimeTracking + + +class RunEnvironment(object): + """ + basic run class to measure execution time. Either call this class calling it by 'with' or delete the class instance + after finishing the measurement. The duration result is logged. + """ + + del_by_exit = False + data_store = DataStoreObject() + + def __init__(self): + """ + Starts time tracking automatically and logs as info. + """ + self.time = TimeTracking() + logging.info(f"{self.__class__.__name__} started") + + def __del__(self): + """ + This is the class finalizer. The code is not executed if already called by exit method to prevent duplicated + logging (__exit__ is always executed before __del__) it this class was used in a with statement. + """ + if not self.del_by_exit: + self.time.stop() + logging.info(f"{self.__class__.__name__} finished after {self.time}") + self.del_by_exit = True + if self.__class__.__name__ == "RunEnvironment": + self.data_store.clear_data_store() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__del__() + + @staticmethod + def do_stuff(length=2): + time.sleep(length) diff --git a/test/test_data_generator.py b/test/test_data_generator.py index ab6233f34533fa7ad35f05306f3990209b83bb82..7c745782bc057060dd439af1fb6e03c5b3ef5730 100644 --- a/test/test_data_generator.py +++ b/test/test_data_generator.py @@ -17,7 +17,7 @@ class TestDataGenerator: 'datetime', 'variables', 'o3') def test_init(self, gen): - assert gen.path == os.path.join(os.path.dirname(__file__), 'data') + assert gen.data_path == os.path.join(os.path.dirname(__file__), 'data') assert gen.network == 'UBA' assert gen.stations == ['DEBW107'] assert gen.variables == ['o3', 'temp'] diff --git a/test/test_helpers.py b/test/test_helpers.py index 742082e57e4e7374b2d35fab43a66376a90c0442..7de58f331ea4b6eeaf46b6d8b10f8ad1da487f3c 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -5,6 +5,7 @@ import os import keras import numpy as np import mock +import platform class TestToList: @@ -19,7 +20,7 @@ class TestToList: class TestCheckPath: def test_check_path_and_create(self, caplog): - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) path = 'data/test' assert not os.path.exists('data/test') check_path_and_create(path) @@ -34,7 +35,7 @@ class TestLoss: def test_l_p_loss(self): model = keras.Sequential() - model.add(keras.layers.Lambda(lambda x: x, input_shape=(None, ))) + model.add(keras.layers.Lambda(lambda x: x, input_shape=(None,))) model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) hist = model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1) assert hist.history['loss'][0] == 1.25 @@ -78,7 +79,7 @@ class TestLearningRateDecay: model.add(keras.layers.Dense(1, input_dim=1)) model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=5, callbacks=[lr_decay]) - assert lr_decay.lr['lr'] == [0.02, 0.02, 0.02*0.95, 0.02*0.95, 0.02*0.95*0.95] + assert lr_decay.lr['lr'] == [0.02, 0.02, 0.02 * 0.95, 0.02 * 0.95, 0.02 * 0.95 * 0.95] class TestTimeTracking: @@ -152,7 +153,7 @@ class TestPrepareHost: @mock.patch("os.path.exists", return_value=True) def test_prepare_host(self, mock_host, mock_user, mock_path): path = prepare_host() - assert path == "/home/testUser/machinelearningtools" + assert path == "/home/testUser/machinelearningtools/data/toar_daily/" path = prepare_host() assert path == "/home/testUser/Data/toar_daily/" path = prepare_host() @@ -168,18 +169,19 @@ class TestPrepareHost: with pytest.raises(OSError) as e: prepare_host() assert "unknown host 'NotExistingHostName'" in e.value.args[0] - mock_host.return_value = "linux-gzsx" - with pytest.raises(NotADirectoryError) as e: - prepare_host() - assert "path '/home/zombie21/machinelearningtools' does not exist for host 'linux-gzsx'" in e.value.args[0] + if "runner-6HmDp9Qd-project-2411-concurrent" not in platform.node(): + mock_host.return_value = "linux-gzsx" + with pytest.raises(NotADirectoryError) as e: + prepare_host() + assert "does not exist for host 'linux-gzsx'" in e.value.args[0] class TestSetExperimentName: def test_set_experiment(self): exp_name, exp_path = set_experiment_name() - assert exp_name == "" - assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "")) + assert exp_name == "TestExperiment" + assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "TestExperiment")) exp_name, exp_path = set_experiment_name(experiment_date="2019-11-14", experiment_path="./test2") assert exp_name == "2019-11-14_network/" assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test2")) diff --git a/test/test_modules.py b/test/test_modules.py new file mode 100644 index 0000000000000000000000000000000000000000..b28b04f643122b019e912540f228c8ed20be9eeb --- /dev/null +++ b/test/test_modules.py @@ -0,0 +1,3 @@ + + + diff --git a/test/test_modules/test_experiment_setup.py b/test/test_modules/test_experiment_setup.py new file mode 100644 index 0000000000000000000000000000000000000000..832ff45a0a3b1384e1300c0fa38ed3d1ec2204b8 --- /dev/null +++ b/test/test_modules/test_experiment_setup.py @@ -0,0 +1,147 @@ +import pytest +import logging +import argparse +import os + +from src.modules.experiment_setup import ExperimentSetup +from src.helpers import TimeTracking, prepare_host +from src.datastore import NameNotFoundInScope, NameNotFoundInDataStore + + +class TestExperimentSetup: + + @pytest.fixture + def empty_obj(self, caplog): + obj = object.__new__(ExperimentSetup) + obj.time = TimeTracking() + caplog.set_level(logging.DEBUG) + return obj + + def test_set_param_by_value(self, caplog, empty_obj): + empty_obj._set_param("23tester", 23) + assert caplog.record_tuples[-1] == ('root', 10, 'set experiment attribute: 23tester(general)=23') + assert empty_obj.data_store.get("23tester", "general") == 23 + + def test_set_param_by_value_and_scope(self, caplog, empty_obj): + empty_obj._set_param("109tester", 109, "general.testing") + assert empty_obj.data_store.get("109tester", "general.tester") == 109 + + def test_set_param_with_default(self, caplog, empty_obj): + empty_obj._set_param("NoneTester", None, "notNone", "general.testing") + assert empty_obj.data_store.get("NoneTester", "general.testing") == "notNone" + empty_obj._set_param("AnotherNoneTester", None) + assert empty_obj.data_store.get("AnotherNoneTester", "general") is None + + def test_get_parser_args_from_dict(self, empty_obj): + res = empty_obj._get_parser_args({'test2': 2, 'test10str': "10"}) + assert res == {'test2': 2, 'test10str': "10"} + + def test_get_parser_args_from_parse_args(self, empty_obj): + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', type=str) + parser_args = parser.parse_args(["--experiment_date", "TOMORROW"]) + assert empty_obj._get_parser_args(parser_args) == {"experiment_date": "TOMORROW"} + + def test_init_default(self): + exp_setup = ExperimentSetup() + data_store = exp_setup.data_store + # experiment setup + assert data_store.get("data_path", "general") == prepare_host() + assert data_store.get("trainable", "general") is False + assert data_store.get("fraction_of_train", "general") == 0.8 + # set experiment name + assert data_store.get("experiment_name", "general") == "TestExperiment" + path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "TestExperiment")) + assert data_store.get("experiment_path", "general") == path + default_var_all_dict = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + # setup for data + assert data_store.get("var_all_dict", "general") == default_var_all_dict + default_stations = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', + 'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', + 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', + 'DEBW042', 'DEBW039', 'DEBY001', 'DEBY113', 'DEBY089', 'DEBW024', 'DEBW004', 'DEBY037', + 'DEBW056', 'DEBW029', 'DEBY068', 'DEBW010', 'DEBW026', 'DEBY002', 'DEBY079', 'DEBW084', + 'DEBY049', 'DEBY031', 'DEBW019', 'DEBW001', 'DEBY063', 'DEBY005', 'DEBW046', 'DEBW103', + 'DEBW052', 'DEBW034', 'DEBY088', ] + assert data_store.get("stations", "general") == default_stations + assert data_store.get("network", "general") == "AIRBASE" + assert data_store.get("variables", "general") == list(default_var_all_dict.keys()) + assert data_store.get("statistics_per_var", "general") == default_var_all_dict + assert data_store.get("start", "general") == "1997-01-01" + assert data_store.get("end", "general") == "2017-12-31" + assert data_store.get("window_history", "general") == 13 + # target + assert data_store.get("target_var", "general") == "o3" + assert data_store.get("target_dim", "general") == "variables" + assert data_store.get("window_lead_time", "general") == 3 + # interpolation + assert data_store.get("dimensions", "general") == {'new_index': ['datetime', 'Stations']} + assert data_store.get("interpolate_dim", "general") == "datetime" + assert data_store.get("interpolate_method", "general") == "linear" + assert data_store.get("limit_nan_fill", "general") == 1 + # train parameters + assert data_store.get("start", "general.train") == "1997-01-01" + assert data_store.get("end", "general.train") == "2007-12-31" + # validation parameters + assert data_store.get("start", "general.val") == "2008-01-01" + assert data_store.get("end", "general.val") == "2009-12-31" + # test parameters + assert data_store.get("start", "general.test") == "2010-01-01" + assert data_store.get("end", "general.test") == "2017-12-31" + # use all stations on all data sets (train, val, test) + assert data_store.get("use_all_stations_on_all_data_sets", "general") is True + + def test_init_no_default(self): + experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) + kwargs = dict(parser_args={"experiment_date": "TODAY"}, + var_all_dict={'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum'}, + stations=['DEBY053', 'DEBW059', 'DEBW027'], network="INTERNET", variables=["o3", "temp"], + statistics_per_var=None, start="1999-01-01", end="2001-01-01", window_history=4, + target_var="temp", target_dim="target", window_lead_time=10, dimensions="dim1", + interpolate_dim="int_dim", interpolate_method="cubic", limit_nan_fill=5, train_start="2000-01-01", + train_end="2000-01-02", val_start="2000-01-03", val_end="2000-01-04", test_start="2000-01-05", + test_end="2000-01-06", use_all_stations_on_all_data_sets=False, trainable=True, + fraction_of_train=0.5, experiment_path=experiment_path) + exp_setup = ExperimentSetup(**kwargs) + data_store = exp_setup.data_store + # experiment setup + assert data_store.get("data_path", "general") == prepare_host() + assert data_store.get("trainable", "general") is True + assert data_store.get("fraction_of_train", "general") == 0.5 + # set experiment name + assert data_store.get("experiment_name", "general") == "TODAY_network/" + path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) + assert data_store.get("experiment_path", "general") == path + # setup for data + assert data_store.get("var_all_dict", "general") == {'o3': 'dma8eu', 'relhum': 'average_values', + 'temp': 'maximum'} + assert data_store.get("stations", "general") == ['DEBY053', 'DEBW059', 'DEBW027'] + assert data_store.get("network", "general") == "INTERNET" + assert data_store.get("variables", "general") == ["o3", "temp"] + assert data_store.get("statistics_per_var", "general") == {'o3': 'dma8eu', 'relhum': 'average_values', + 'temp': 'maximum'} + assert data_store.get("start", "general") == "1999-01-01" + assert data_store.get("end", "general") == "2001-01-01" + assert data_store.get("window_history", "general") == 4 + # target + assert data_store.get("target_var", "general") == "temp" + assert data_store.get("target_dim", "general") == "target" + assert data_store.get("window_lead_time", "general") == 10 + # interpolation + assert data_store.get("dimensions", "general") == "dim1" + assert data_store.get("interpolate_dim", "general") == "int_dim" + assert data_store.get("interpolate_method", "general") == "cubic" + assert data_store.get("limit_nan_fill", "general") == 5 + # train parameters + assert data_store.get("start", "general.train") == "2000-01-01" + assert data_store.get("end", "general.train") == "2000-01-02" + # validation parameters + assert data_store.get("start", "general.val") == "2000-01-03" + assert data_store.get("end", "general.val") == "2000-01-04" + # test parameters + assert data_store.get("start", "general.test") == "2000-01-05" + assert data_store.get("end", "general.test") == "2000-01-06" + # use all stations on all data sets (train, val, test) + assert data_store.get("use_all_stations_on_all_data_sets", "general.test") is False diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py new file mode 100644 index 0000000000000000000000000000000000000000..bc121885ddb8ee20b0f571e7f0250845c6e99e6a --- /dev/null +++ b/test/test_modules/test_pre_processing.py @@ -0,0 +1,110 @@ +import logging +import pytest + +from src.helpers import PyTestRegex, TimeTracking +from src.modules.experiment_setup import ExperimentSetup +from src.modules.pre_processing import PreProcessing, DEFAULT_ARGS_LIST, DEFAULT_KWARGS_LIST +from src.data_generator import DataGenerator +from src.datastore import NameNotFoundInScope +from src.modules.run_environment import RunEnvironment + + +class TestPreProcessing: + + @pytest.fixture + def obj_no_init(self): + return object.__new__(PreProcessing) + + @pytest.fixture + def obj_super_init(self): + obj = object.__new__(PreProcessing) + super(PreProcessing, obj).__init__() + obj.data_store.put("NAME1", 1, "general") + obj.data_store.put("NAME2", 2, "general") + obj.data_store.put("NAME3", 3, "general") + obj.data_store.put("NAME1", 10, "general.sub") + obj.data_store.put("NAME4", 4, "general.sub.sub") + yield obj + RunEnvironment().__del__() + + @pytest.fixture + def obj_with_exp_setup(self): + ExperimentSetup(parser_args={}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], + var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) + pre = object.__new__(PreProcessing) + super(PreProcessing, pre).__init__() + yield pre + RunEnvironment().__del__() + + def test_init(self, caplog): + ExperimentSetup(parser_args={}, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'], + var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}) + caplog.set_level(logging.INFO) + PreProcessing() + assert caplog.record_tuples[0] == ('root', 20, 'PreProcessing started') + assert caplog.record_tuples[1] == ('root', 20, 'check valid stations started') + assert caplog.record_tuples[-2] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 5 station\(s\)')) + RunEnvironment().__del__() + + def test_run(self, obj_with_exp_setup): + assert obj_with_exp_setup.data_store.search_name("generator") == [] + assert obj_with_exp_setup._run() is None + assert obj_with_exp_setup.data_store.search_name("generator") == sorted(["general.train", "general.val", + "general.test"]) + + def test_split_train_val_test(self, obj_with_exp_setup): + assert obj_with_exp_setup.data_store.search_name("generator") == [] + obj_with_exp_setup.split_train_val_test() + data_store = obj_with_exp_setup.data_store + assert data_store.search_scope("general.train") == sorted(["generator", "start", "end", "stations"]) + assert data_store.search_name("generator") == sorted(["general.train", "general.val", "general.test"]) + + def test_create_set_split_not_all_stations(self, caplog, obj_with_exp_setup): + caplog.set_level(logging.DEBUG) + obj_with_exp_setup.data_store.put("use_all_stations_on_all_data_sets", False, "general.awesome") + obj_with_exp_setup.create_set_split(slice(0, 2), "awesome") + assert caplog.record_tuples[0] == ('root', 10, "Awesome stations (len=2): ['DEBW107', 'DEBY081']") + data_store = obj_with_exp_setup.data_store + assert isinstance(data_store.get("generator", "general.awesome"), DataGenerator) + with pytest.raises(NameNotFoundInScope): + data_store.get("generator", "general") + assert data_store.get("stations", "general.awesome") == ["DEBW107", "DEBY081"] + + def test_create_set_split_all_stations(self, caplog, obj_with_exp_setup): + caplog.set_level(logging.DEBUG) + obj_with_exp_setup.create_set_split(slice(0, 2), "awesome") + assert caplog.record_tuples[0] == ('root', 10, "Awesome stations (len=5): ['DEBW107', 'DEBY081', 'DEBW013', " + "'DEBW076', 'DEBW087']") + data_store = obj_with_exp_setup.data_store + assert isinstance(data_store.get("generator", "general.awesome"), DataGenerator) + with pytest.raises(NameNotFoundInScope): + data_store.get("generator", "general") + assert data_store.get("stations", "general.awesome") == ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] + + def test_check_valid_stations(self, caplog, obj_with_exp_setup): + pre = obj_with_exp_setup + caplog.set_level(logging.INFO) + args = pre._create_args_dict(DEFAULT_ARGS_LIST) + kwargs = pre._create_args_dict(DEFAULT_KWARGS_LIST) + stations = pre.data_store.get("stations", "general") + valid_stations = pre.check_valid_stations(args, kwargs, stations) + assert valid_stations == stations + assert caplog.record_tuples[0] == ('root', 20, 'check valid stations started') + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 5 station\(s\)')) + + def test_split_set_indices(self, obj_no_init): + dummy_list = list(range(0, 15)) + train, val, test = obj_no_init.split_set_indices(len(dummy_list), 0.9) + assert dummy_list[train] == list(range(0, 10)) + assert dummy_list[val] == list(range(10, 13)) + assert dummy_list[test] == list(range(13, 15)) + + def test_create_args_dict_default_scope(self, obj_super_init): + assert obj_super_init._create_args_dict(["NAME1", "NAME2"]) == {"NAME1": 1, "NAME2": 2} + + def test_create_args_dict_given_scope(self, obj_super_init): + assert obj_super_init._create_args_dict(["NAME1", "NAME2"], scope="general.sub") == {"NAME1": 10, "NAME2": 2} + + def test_create_args_dict_missing_entry(self, obj_super_init): + assert obj_super_init._create_args_dict(["NAME5", "NAME2"]) == {"NAME2": 2} + assert obj_super_init._create_args_dict(["NAME4", "NAME2"]) == {"NAME2": 2} diff --git a/test/test_modules/test_run_environment.py b/test/test_modules/test_run_environment.py new file mode 100644 index 0000000000000000000000000000000000000000..ce5f995e54df1ffc93c80c191376347c5f0b3741 --- /dev/null +++ b/test/test_modules/test_run_environment.py @@ -0,0 +1,31 @@ +import logging + +from src.helpers import TimeTracking, PyTestRegex +from src.modules.run_environment import RunEnvironment + + +class TestRunEnvironment: + + def test_enter(self, caplog): + caplog.set_level(logging.INFO) + with RunEnvironment() as r: + assert caplog.record_tuples[-1] == ('root', 20, 'RunEnvironment started') + assert isinstance(r.time, TimeTracking) + + def test_exit(self, caplog): + caplog.set_level(logging.INFO) + with RunEnvironment() as r: + r.do_stuff(0.1) + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s")) + + def test_init(self, caplog): + caplog.set_level(logging.INFO) + r = RunEnvironment() + assert caplog.record_tuples[-1] == ('root', 20, 'RunEnvironment started') + + def test_del(self, caplog): + caplog.set_level(logging.INFO) + r = RunEnvironment() + r.do_stuff(0.2) + del r + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s")) diff --git a/test/test_statistics.py b/test/test_statistics.py index d31f4e9919da27e679019b892d98557dee9a7f1d..fa7de6022f0957031f90fbf951679594583eccab 100644 --- a/test/test_statistics.py +++ b/test/test_statistics.py @@ -7,9 +7,9 @@ from src.statistics import standardise, standardise_inverse, centre, centre_inve @pytest.fixture(scope='module') def input_data(): - return np.array([np.random.normal(2, 2, 2000), - np.random.normal(-5, 3, 2000), - np.random.normal(10, 1, 2000)]).T + return np.array([np.random.normal(2, 2, 3000), + np.random.normal(-5, 3, 3000), + np.random.normal(10, 1, 3000)]).T @pytest.fixture(scope='module')