diff --git a/.gitignore b/.gitignore index cec17a77983bc017ef057b020600d15922e61f23..f7793d5f492cced655aeb62a8c29af48ac3e452e 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,8 @@ htmlcov/ /test/test_modules/data/ report.html /TestExperiment/ -/testrun_network/ +/testrun_network*/ + +# secret variables # +#################### +/src/join_settings.py \ No newline at end of file diff --git a/README.md b/README.md index e49362e95e9a69c159a5b8d857ccb336cf58d3c6..31365da89169cfe2be58de89a574ae4b69e40224 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,12 @@ and [Network In Network (Lin et al., 2014)](https://arxiv.org/abs/1312.4400). # Installation * Install __proj__ on your machine using the console. E.g. for opensuse / leap `zypper install proj` -* c++ compiler required for cartopy installation \ No newline at end of file +* c++ compiler required for cartopy installation + +# Security + +* To use hourly data from ToarDB via JOIN interface, a private token is required. Request your personal access token and +add it to `src/join_settings.py` in the hourly data section. Replace the `TOAR_SERVICE_URL` and the `Authorization` +value. To make sure, that this **sensitive** data is not uploaded to the remote server, use the following command to +prevent git from tracking this file: `git update-index --assume-unchanged src/join_settings.py +` \ No newline at end of file diff --git a/run.py b/run.py index 9f38fdca9c51cbed332725ce8e120e1493551b93..8e4d9c46f4a39224335dd65b689f519943166d0f 100644 --- a/run.py +++ b/run.py @@ -2,15 +2,15 @@ __author__ = "Lukas Leufen" __date__ = '2019-11-14' -import logging import argparse +import logging from src.run_modules.experiment_setup import ExperimentSetup -from src.run_modules.run_environment import RunEnvironment -from src.run_modules.pre_processing import PreProcessing from src.run_modules.model_setup import ModelSetup -from src.run_modules.training import Training from src.run_modules.post_processing import PostProcessing +from src.run_modules.pre_processing import PreProcessing +from src.run_modules.run_environment import RunEnvironment +from src.run_modules.training import Training def main(parser_args): diff --git a/run_hourly.py b/run_hourly.py new file mode 100644 index 0000000000000000000000000000000000000000..af531aedbd275b133a087777334dba0ae24bd9c8 --- /dev/null +++ b/run_hourly.py @@ -0,0 +1,41 @@ +__author__ = "Lukas Leufen" +__date__ = '2019-11-14' + + +import argparse +import logging + +from src.run_modules.experiment_setup import ExperimentSetup +from src.run_modules.model_setup import ModelSetup +from src.run_modules.post_processing import PostProcessing +from src.run_modules.pre_processing import PreProcessing +from src.run_modules.run_environment import RunEnvironment +from src.run_modules.training import Training + + +def main(parser_args): + + with RunEnvironment(): + ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], + station_type='background', trainable=True, sampling="hourly", window_history_size=48) + PreProcessing() + + ModelSetup() + + Training() + + PostProcessing() + + +if __name__ == "__main__": + + formatter = '%(asctime)s - %(levelname)s: %(message)s [%(filename)s:%(funcName)s:%(lineno)s]' + logging.basicConfig(format=formatter, level=logging.INFO) + # logging.basicConfig(format=formatter, level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None, + help="set experiment date as string") + args = parser.parse_args(["--experiment_date", "testrun"]) + + main(args) diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..8e2358dc56797578fe0de020aa827b1fef8663bf --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +join_settings.py \ No newline at end of file diff --git a/src/data_handling/bootstraps.py b/src/data_handling/bootstraps.py index 6ac33e2a2555fe3f253593423e9e71e0aa97f4af..8690785659ab256fc78b4cfe8701461f67236a9b 100644 --- a/src/data_handling/bootstraps.py +++ b/src/data_handling/bootstraps.py @@ -6,9 +6,69 @@ from src.run_modules.run_environment import RunEnvironment from src.data_handling.data_generator import DataGenerator import numpy as np import logging +import dask.array as da import xarray as xr import os import re +from src import helpers + + +class BootStrapGenerator: + + def __init__(self, orig_generator, boots, chunksize, bootstrap_path): + self.orig_generator: DataGenerator = orig_generator + self.stations = self.orig_generator.stations + self.variables = self.orig_generator.variables + self.boots = boots + self.chunksize = chunksize + self.bootstrap_path = bootstrap_path + self._iterator = 0 + + def __len__(self): + """ + display the number of stations + """ + return len(self.orig_generator)*self.boots*len(self.variables) + + # def __iter__(self): + # """ + # Define the __iter__ part of the iterator protocol to iterate through this generator. Sets the private attribute + # `_iterator` to 0. + # :return: + # """ + # self._iterator = 0 + # return self + + def __iter__(self): + """ + This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return + the history and label data of this generator. + :return: + """ + while True: + for i, data in enumerate(self.orig_generator): + station = self.orig_generator.get_station_key(i) + logging.info(f"station: {station}") + hist, label = data + shuffled_data = self.load_boot_data(station) + for var in self.variables: + logging.info(f" var: {var}") + for boot in range(self.boots): + logging.debug(f"boot: {boot}") + boot_hist = hist.sel(variables=helpers.list_pop(self.variables, var)) + shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots").transpose("datetime", "window", "Stations", "variables") + boot_hist = boot_hist.combine_first(shuffled_var) + boot_hist = boot_hist.sortby("variables") + yield boot_hist, label + return + + def load_boot_data(self, station): + files = os.listdir(self.bootstrap_path) + regex = re.compile(rf"{station}_\w*\.nc") + file_name = os.path.join(self.bootstrap_path, list(filter(regex.search, files))[0]) + # shuffled_data = xr.open_dataarray(file_name, chunks=self.chunksize) + shuffled_data = xr.open_dataarray(file_name, chunks=100) + return shuffled_data class BootStraps(RunEnvironment): @@ -17,9 +77,17 @@ class BootStraps(RunEnvironment): super().__init__() self.test_data: DataGenerator = self.data_store.get("generator", "general.test") - self.number_bootstraps = 200 + self.number_bootstraps = 10 self.bootstrap_path = self.data_store.get("bootstrap_path", "general") + self.chunks = self.get_chunk_size() self.create_shuffled_data() + bsg =BootStrapGenerator(self.test_data, self.number_bootstraps, self.chunks, self.bootstrap_path) + for bs in bsg: + hist, label = bs + + def get_chunk_size(self): + hist, _ = self.test_data[0] + return (100, *hist.shape[1:], self.number_bootstraps) def create_shuffled_data(self): """ @@ -38,11 +106,12 @@ class BootStraps(RunEnvironment): file_name = f"{station}_{variables_str}_hist{window}_nboots{nboot}_shuffled.nc" file_path = os.path.join(self.bootstrap_path, file_name) data = data.expand_dims({'boots': range(nboot)}, axis=-1) - shuffled_variable = np.full(data.shape, np.nan) + shuffled_variable = [] for i, var in enumerate(data.coords['variables']): single_variable = data.sel(variables=var).values - shuffled_variable[..., i, :] = self.shuffle_single_variable(single_variable) - shuffled_data = xr.DataArray(shuffled_variable, coords=data.coords, dims=data.dims) + shuffled_variable.append(self.shuffle_single_variable(single_variable, chunks=(100, *data.shape[1:3], data.shape[-1]))) + shuffled_variable_da = da.stack(shuffled_variable, axis=-2, ).rechunk("auto") + shuffled_data = xr.DataArray(shuffled_variable_da, coords=data.coords, dims=data.dims) shuffled_data.to_netcdf(file_path) def valid_bootstrap_file(self, station, variables, window): @@ -59,7 +128,7 @@ class BootStraps(RunEnvironment): :param window: :return: """ - regex = re.compile(rf"{station}_{variables}_hist(\d+)_nboots(\d+)_shuffled*") + regex = re.compile(rf"{station}_{variables}_hist(\d+)_nboots(\d+)_shuffled") max_nboot = self.number_bootstraps for file in os.listdir(self.bootstrap_path): match = regex.match(file) @@ -74,10 +143,9 @@ class BootStraps(RunEnvironment): return False, max_nboot @staticmethod - def shuffle_single_variable(data: np.ndarray) -> np.ndarray: - orig_shape = data.shape - size = orig_shape - return np.random.choice(data.reshape(-1,), size=size) + def shuffle_single_variable(data: da.array, chunks) -> np.ndarray: + size = data.shape + return da.random.choice(data.reshape(-1,), size=size, chunks=chunks) if __name__ == "__main__": diff --git a/src/data_handling/data_generator.py b/src/data_handling/data_generator.py index 732a7efdf8f360b49823dfb6ca5ca3239cc774af..7aa24a88a0b80b1d4d2b54973bf02f232184a732 100644 --- a/src/data_handling/data_generator.py +++ b/src/data_handling/data_generator.py @@ -1,15 +1,17 @@ __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-11-07' -import keras -from src import helpers -from src.data_handling.data_preparation import DataPrep import os from typing import Union, List, Tuple, Any + +import keras import xarray as xr import pickle import logging +from src import helpers +from src.data_handling.data_preparation import DataPrep + class DataGenerator(keras.utils.Sequence): """ @@ -76,8 +78,7 @@ class DataGenerator(keras.utils.Sequence): data = self.get_data_generator() self._iterator += 1 if data.history is not None and data.label is not None: # pragma: no branch - return data.history.transpose("datetime", "window", "Stations", "variables"), \ - data.label.squeeze("Stations").transpose("datetime", "window") + return data.get_transposed_history(), data.get_transposed_label() else: self.__next__() # pragma: no cover else: @@ -90,8 +91,7 @@ class DataGenerator(keras.utils.Sequence): :return: The generator's time series of history data and its labels """ data = self.get_data_generator(key=item) - return data.history.transpose("datetime", "window", "Stations", "variables"), \ - data.label.squeeze("Stations").transpose("datetime", "window") + return data.get_transposed_history(), data.get_transposed_label() def get_data_generator(self, key: Union[str, int] = None, local_tmp_storage: bool = True) -> DataPrep: """ @@ -124,7 +124,10 @@ class DataGenerator(keras.utils.Sequence): Save given data locally as .pickle in self.data_path_tmp with name '<station>_<var1>_<var2>_..._<varX>.pickle' :param data: any data, that should be saved """ - file = os.path.join(self.data_path_tmp, f"{''.join(data.station)}_{'_'.join(sorted(data.variables))}.pickle") + date = f"{self.kwargs.get('start')}_{self.kwargs.get('end')}" + vars = '_'.join(sorted(data.variables)) + station = ''.join(data.station) + file = os.path.join(self.data_path_tmp, f"{station}_{vars}_{date}_.pickle") with open(file, "wb") as f: pickle.dump(data, f) logging.debug(f"save pickle data to {file}") @@ -136,7 +139,10 @@ class DataGenerator(keras.utils.Sequence): :param variables: list of variables to load :return: loaded data """ - file = os.path.join(self.data_path_tmp, f"{''.join(station)}_{'_'.join(sorted(variables))}.pickle") + date = f"{self.kwargs.get('start')}_{self.kwargs.get('end')}" + vars = '_'.join(sorted(variables)) + station = ''.join(station) + file = os.path.join(self.data_path_tmp, f"{station}_{vars}_{date}_.pickle") with open(file, "rb") as f: data = pickle.load(f) logging.debug(f"load pickle data from {file}") diff --git a/src/data_handling/data_preparation.py b/src/data_handling/data_preparation.py index c39625b1e02506696ee5b4c13ac86c7e73420acf..490515aafaf51044ffb1121d276a3bdec4912fff 100644 --- a/src/data_handling/data_preparation.py +++ b/src/data_handling/data_preparation.py @@ -1,15 +1,17 @@ __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-10-16' -import xarray as xr -import pandas as pd -import numpy as np +import datetime as dt import logging import os +from typing import Union, List, Iterable + +import numpy as np +import pandas as pd +import xarray as xr + from src import join, helpers from src import statistics -from typing import Union, List, Iterable -import datetime as dt # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -58,12 +60,11 @@ class DataPrep(object): self.meta = None self._transform_method = None self.statistics_per_var = kwargs.get("statistics_per_var", None) - if self.statistics_per_var is not None: + self.sampling = kwargs.get("sampling", "daily") + if self.statistics_per_var is not None or self.sampling == "hourly": self.load_data() else: - raise NotImplementedError # hourly data usage is not implemented yet - # self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station, - # self.variables, **kwargs) + raise NotImplementedError("Either select hourly data or provide statistics_per_var.") def load_data(self): """ @@ -122,7 +123,7 @@ class DataPrep(object): """ df_all = {} df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var, - station_type=self.station_type, network_name=self.network) + station_type=self.station_type, network_name=self.network, sampling=self.sampling) df_all[self.station[0]] = df # convert df_all to xarray xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} @@ -385,6 +386,13 @@ class DataPrep(object): data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) return data + def get_transposed_history(self): + if self.history is not None: + return self.history.transpose("datetime", "window", "Stations", "variables").copy() + + def get_transposed_label(self): + return self.label.squeeze("Stations").transpose("datetime", "window").copy() + if __name__ == "__main__": dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) diff --git a/src/datastore.py b/src/datastore.py index 69e486796cc4320d88b3c3f5fd4a970d4ee814e9..d9f844ff97acb3f5c6600205f91100219d9c53e6 100644 --- a/src/datastore.py +++ b/src/datastore.py @@ -2,8 +2,8 @@ __author__ = 'Lukas Leufen' __date__ = '2019-11-22' -from typing import Any, List, Tuple, Dict from abc import ABC +from typing import Any, List, Tuple, Dict class NameNotFoundInDataStore(Exception): diff --git a/src/helpers.py b/src/helpers.py index 680d3bd12132065763cb1f311feaca32bf7c75a8..8a50b0e723d28652e1eb7e27c53636b506774b74 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -96,24 +96,24 @@ class TimeTracking(object): logging.info(f"undefined job finished after {self}") -def prepare_host(create_new=True): +def prepare_host(create_new=True, sampling="daily"): hostname = socket.gethostname() try: user = os.getlogin() except OSError: user = "default" - if hostname == 'ZAM144': - path = f'/home/{user}/Data/toar_daily/' - elif hostname == 'zam347': - path = f'/home/{user}/Data/toar_daily/' - elif hostname == 'linux-aa9b': - path = f'/home/{user}/machinelearningtools/data/toar_daily/' - elif (len(hostname) > 2) and (hostname[:2] == 'jr'): - path = f'/p/project/cjjsc42/{user}/DATA/toar_daily/' - elif (len(hostname) > 2) and (hostname[:2] == 'jw'): - path = f'/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_daily/' + if hostname == "ZAM144": + path = f"/home/{user}/Data/toar_{sampling}/" + elif hostname == "zam347": + path = f"/home/{user}/Data/toar_{sampling}/" + elif hostname == "linux-aa9b": + path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/" + elif (len(hostname) > 2) and (hostname[:2] == "jr"): + path = f"/p/project/cjjsc42/{user}/DATA/toar_{sampling}/" + elif (len(hostname) > 2) and (hostname[:2] == "jw"): + path = f"/p/home/jusers/{user}/juwels/intelliaq/DATA/toar_{sampling}/" elif "runner-6HmDp9Qd-project-2411-concurrent" in hostname: - path = f'/home/{user}/machinelearningtools/data/toar_daily/' + path = f"/home/{user}/machinelearningtools/data/toar_{sampling}/" else: logging.error(f"unknown host '{hostname}'") raise OSError(f"unknown host '{hostname}'") @@ -132,12 +132,14 @@ def prepare_host(create_new=True): return path -def set_experiment_name(experiment_date=None, experiment_path=None): +def set_experiment_name(experiment_date=None, experiment_path=None, sampling=None): if experiment_date is None: experiment_name = "TestExperiment" else: experiment_name = f"{experiment_date}_network" + if sampling == "hourly": + experiment_name += f"_{sampling}" if experiment_path is None: experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", experiment_name)) else: @@ -195,3 +197,13 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce """ multiplier = 10. ** decimals return round_type(number * multiplier) / multiplier + + +def list_pop(list_full: list, pop_items): + pop_items = to_list(pop_items) + if len(pop_items) > 1: + return [e for e in list_full if e not in pop_items] + else: + list_pop = list_full.copy() + list_pop.remove(pop_items[0]) + return list_pop diff --git a/src/join.py b/src/join.py index 43271a7b0525b5d829ea761019176197c78c5468..351060f7bf4949801f94b04c13e3881f008389b6 100644 --- a/src/join.py +++ b/src/join.py @@ -2,14 +2,17 @@ __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-10-16' -import requests -import logging -import pandas as pd import datetime as dt +import logging from typing import Iterator, Union, List, Dict + +import pandas as pd +import requests + from src import helpers +from src.join_settings import join_settings -join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' +# join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' str_or_none = Union[str, None] @@ -21,7 +24,7 @@ class EmptyQueryResult(Exception): def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, - network_name: str = None) -> [pd.DataFrame, pd.DataFrame]: + network_name: str = None, sampling: str = "daily") -> [pd.DataFrame, pd.DataFrame]: """ read data from JOIN/TOAR @@ -29,6 +32,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean' :param station_type: set the station type like "traffic" or "background", can be none :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none + :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) :returns: - df - data frame with all variables and statistics - meta - data frame with all meta information @@ -36,8 +40,15 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # make sure station_name parameter is a list station_name = helpers.to_list(station_name) + # get data connection settings + join_url_base, headers = join_settings(sampling) + # load series information - vars_dict = load_series_information(station_name, station_type, network_name) + vars_dict = load_series_information(station_name, station_type, network_name, join_url_base, headers) + + # correct stat_var values if data is not aggregated (hourly) + if sampling == "hourly": + [stat_var.update({k: "values"}) for k in stat_var.keys()] # download all variables with given statistic data = None @@ -49,10 +60,16 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # create data link opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var], - 'sampling': 'daily', 'capture': 0, 'min_data_length': 1460} + 'sampling': sampling, 'capture': 0, 'min_data_length': 1460, 'format': 'json'} # load data - data = get_data(opts) + data = get_data(opts, headers) + + # adjust data format if given as list of list + # no branch cover because this just happens when downloading hourly data using a secret token, not available + # for CI testing. + if isinstance(data, list): # pragma: no branch + data = correct_data_format(data) # correct namespace of statistics stat = _correct_stat_name(stat_var[var]) @@ -70,30 +87,51 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t raise EmptyQueryResult("No data found in JOIN.") -def get_data(opts: Dict) -> Union[Dict, List]: +def correct_data_format(data): + """ + Transform to the standard data format. For some cases (e.g. hourly data), the data is returned as list instead of + a dictionary with keys datetime, values and metadata. This functions addresses this issue and transforms the data + into the dictionary version. + :param data: data in hourly format + :return: the same data but formatted to fit with aggregated format + """ + formatted = {"datetime": [], + "values": [], + "metadata": data[-1]} + for d in data[:-1]: + for k, v in zip(["datetime", "values"], d): + formatted[k].append(v) + return formatted + + +def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]: """ Download join data using requests framework. Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. :param opts: options to create the request url + :param headers: additional headers information like authorization, can be empty :return: requested data (either as list or dictionary) """ url = create_url(**opts) - response = requests.get(url) + response = requests.get(url, headers=headers) return response.json() -def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none) -> Dict: +def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, + join_url_base: str, headers: Dict) -> Dict: """ List all series ids that are available for given station id and network name. :param station_name: Station name e.g. DEBW107 :param station_type: station type like "traffic" or "background" :param network_name: measurement network of the station like "UBA" or "AIRBASE" + :param join_url_base: base url name to download data from + :param headers: additional headers information like authorization, can be empty :return: all available series for requested station stored in an dictionary with parameter name (variable) as key and the series id as value. """ opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type, "network_name": network_name} - station_vars = get_data(opts) + station_vars = get_data(opts, headers) vars_dict = {item[3].lower(): item[0] for item in station_vars} return vars_dict @@ -107,7 +145,11 @@ def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: s :param var: variable the data is from (e.g. 'o3') :return: new created or concatenated data frame """ - index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime']) + if len(data["datetime"][0]) == 19: + str_format = "%Y-%m-%d %H:%M:%S" + else: + str_format = "%Y-%m-%d %H:%M" + index = map(lambda s: dt.datetime.strptime(s, str_format), data['datetime']) if df is None: df = pd.DataFrame(data[stat], index=index, columns=[var]) else: @@ -151,8 +193,10 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} station = 'DEBW107' - download_join(station, var_all_dic) + # download_join(station, var_all_dic, sampling="daily") + download_join(station, var_all_dic, sampling="hourly") diff --git a/src/join_settings.py b/src/join_settings.py new file mode 100644 index 0000000000000000000000000000000000000000..365e8f39d25b28375eadf3b0dbda374feb5b158e --- /dev/null +++ b/src/join_settings.py @@ -0,0 +1,11 @@ + +def join_settings(sampling="daily"): + if sampling == "daily": # pragma: no branch + TOAR_SERVICE_URL = 'https://join.fz-juelich.de/services/rest/surfacedata/' + headers = {} + elif sampling == "hourly": + TOAR_SERVICE_URL = 'https://join.fz-juelich.de/services/rest/surfacedata/' + headers = {} + else: + raise NameError(f"Given sampling {sampling} is not supported, choose from either daily or hourly sampling.") + return TOAR_SERVICE_URL, headers diff --git a/src/model_modules/flatten.py b/src/model_modules/flatten.py index 707b0d4421fd506d16f93d7c56406abc878ab9f1..bbe92472ebb48e7486dede099dc098a161f51695 100644 --- a/src/model_modules/flatten.py +++ b/src/model_modules/flatten.py @@ -1,9 +1,10 @@ __author__ = "Felix Kleinert, Lukas Leufen" __date__ = '2019-12-02' -import keras from typing import Callable +import keras + def flatten_tail(input_X: keras.layers, name: str, bound_weight: bool = False, dropout_rate: float = 0.0, window_lead_time: int = 4, activation: Callable = keras.activations.relu, diff --git a/src/model_modules/inception_model.py b/src/model_modules/inception_model.py index 126dc15320f4b0ac7ff660b709601f083700d01f..11093df56a4b262f75eae7a6f7c05e6e44e1435d 100644 --- a/src/model_modules/inception_model.py +++ b/src/model_modules/inception_model.py @@ -1,9 +1,10 @@ __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-10-22' +import logging + import keras import keras.layers as layers -import logging class InceptionModelBase: diff --git a/src/model_modules/keras_extensions.py b/src/model_modules/keras_extensions.py index e2a4b93219be2cbebfb35749560efa65c07226bb..180e324602da25e1df8fb218c1d3bba180004ac8 100644 --- a/src/model_modules/keras_extensions.py +++ b/src/model_modules/keras_extensions.py @@ -10,6 +10,8 @@ import numpy as np from keras import backend as K from keras.callbacks import History, ModelCheckpoint +from src import helpers + class HistoryAdvanced(History): """ @@ -125,7 +127,7 @@ class ModelCheckpointAdvanced(ModelCheckpoint): Update all stored callback objects. The argument callbacks needs to follow the same convention like described in the class description (list of dictionaries). Must be run before resuming a training process. """ - self.callbacks = callbacks + self.callbacks = helpers.to_list(callbacks) def on_epoch_end(self, epoch, logs=None): """ @@ -139,12 +141,73 @@ class ModelCheckpointAdvanced(ModelCheckpoint): if self.save_best_only: current = logs.get(self.monitor) if current == self.best: - if self.verbose > 0: + if self.verbose > 0: # pragma: no branch print('\nEpoch %05d: save to %s' % (epoch + 1, file_path)) with open(file_path, "wb") as f: pickle.dump(callback["callback"], f) else: with open(file_path, "wb") as f: - if self.verbose > 0: + if self.verbose > 0: # pragma: no branch print('\nEpoch %05d: save to %s' % (epoch + 1, file_path)) pickle.dump(callback["callback"], f) + + +class CallbackHandler: + + def __init__(self): + self.__callbacks = [] + self._checkpoint = None + self.editable = True + + @property + def _callbacks(self): + return [{"callback": clbk[clbk["name"]], "path": clbk["path"]} for clbk in self.__callbacks] + + @_callbacks.setter + def _callbacks(self, value): + name, callback, callback_path = value + self.__callbacks.append({"name": name, name: callback, "path": callback_path}) + + def _update_callback(self, pos, value): + name = self.__callbacks[pos]["name"] + self.__callbacks[pos][name] = value + + def add_callback(self, callback, callback_path, name="callback"): + if self.editable: + self._callbacks = (name, callback, callback_path) + else: + raise PermissionError(f"{__class__.__name__} is protected and cannot be edited.") + + def get_callbacks(self, as_dict=True): + if as_dict: + return self._get_callbacks() + else: + return [clb["callback"] for clb in self._get_callbacks()] + + def get_callback_by_name(self, obj_name): + if obj_name != "callback": + return [clbk[clbk["name"]] for clbk in self.__callbacks if clbk["name"] == obj_name][0] + + def _get_callbacks(self): + clbks = self._callbacks + if self._checkpoint is not None: + clbks += [{"callback": self._checkpoint, "path": self._checkpoint.filepath}] + return clbks + + def get_checkpoint(self): + if self._checkpoint is not None: + return self._checkpoint + + def create_model_checkpoint(self, **kwargs): + self._checkpoint = ModelCheckpointAdvanced(callbacks=self._callbacks, **kwargs) + self.editable = False + + def load_callbacks(self): + for pos, callback in enumerate(self.__callbacks): + path = callback["path"] + clb = pickle.load(open(path, "rb")) + self._update_callback(pos, clb) + + def update_checkpoint(self, history_name="hist"): + self._checkpoint.update_callbacks(self._callbacks) + self._checkpoint.update_best(self.get_callback_by_name(history_name)) diff --git a/src/model_modules/linear_model.py b/src/model_modules/linear_model.py index 17a9b2326ab6ba1829ee4f65f0161de887e70778..3d5323e1b0303b497c1f26c4e84ee9b968380425 100644 --- a/src/model_modules/linear_model.py +++ b/src/model_modules/linear_model.py @@ -2,8 +2,8 @@ __author__ = "Felix Kleinert, Lukas Leufen" __date__ = '2019-12-11' -import statsmodels.api as sm import numpy as np +import statsmodels.api as sm class OrdinaryLeastSquaredModel: @@ -32,7 +32,7 @@ class OrdinaryLeastSquaredModel: def predict(self, data): data = sm.add_constant(self.reshape_xarray_to_numpy(data)) - return self.model.predict(data) + return np.atleast_2d(self.model.predict(data)) @staticmethod def reshape_xarray_to_numpy(data): diff --git a/src/model_modules/model_class.py b/src/model_modules/model_class.py index 02f43dc1b208cfd8a52a937298217216f26fbdb6..ebbd7a25cef9031436d932a6502c9726bfe3e318 100644 --- a/src/model_modules/model_class.py +++ b/src/model_modules/model_class.py @@ -8,8 +8,8 @@ from abc import ABC from typing import Any, Callable import keras - -from src import helpers +from src.model_modules.inception_model import InceptionModelBase +from src.model_modules.flatten import flatten_tail class AbstractModelClass(ABC): @@ -29,6 +29,7 @@ class AbstractModelClass(ABC): self.__model = None self.__loss = None + self.model_name = self.__class__.__name__ def __getattr__(self, name: str) -> Any: @@ -241,3 +242,112 @@ class MyBranchedModel(AbstractModelClass): self.loss = [keras.losses.mean_absolute_error] + [keras.losses.mean_squared_error] + \ [keras.losses.mean_squared_error] + + +class MyTowerModel(AbstractModelClass): + + def __init__(self, window_history_size, window_lead_time, channels): + + """ + Sets model and loss depending on the given arguments. + :param activation: activation function + :param window_history_size: number of historical time steps included in the input data + :param channels: number of variables used in input data + :param regularizer: <not used here> + :param dropout_rate: dropout rate used in the model [0, 1) + :param window_lead_time: number of time steps to forecast in the output layer + """ + + super().__init__() + + # settings + self.window_history_size = window_history_size + self.window_lead_time = window_lead_time + self.channels = channels + self.dropout_rate = 1e-2 + self.regularizer = keras.regularizers.l2(0.1) + self.initial_lr = 1e-2 + self.optimizer = keras.optimizers.adam(lr=self.initial_lr) + self.lr_decay = src.model_modules.keras_extensions.LearningRateDecay(base_lr=self.initial_lr, drop=.94, epochs_drop=10) + self.epochs = 20 + self.batch_size = int(256*4) + self.activation = keras.layers.PReLU + + # apply to model + self.set_model() + self.set_loss() + + def set_model(self): + + """ + Build the model. + :param activation: activation function + :param window_history_size: number of historical time steps included in the input data + :param channels: number of variables used in input data + :param dropout_rate: dropout rate used in the model [0, 1) + :param window_lead_time: number of time steps to forecast in the output layer + :return: built keras model + """ + activation = self.activation + conv_settings_dict1 = { + 'tower_1': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (3, 1), 'activation': activation}, + 'tower_2': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (5, 1), 'activation': activation}, + 'tower_3': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (1, 1), 'activation': activation}, + } + + pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 8 * 2, 'activation': activation} + + conv_settings_dict2 = { + 'tower_1': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (3, 1), + 'activation': activation}, + 'tower_2': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (5, 1), + 'activation': activation}, + 'tower_3': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (1, 1), + 'activation': activation}, + } + pool_settings_dict2 = {'pool_kernel': (3, 1), 'tower_filter': 16, 'activation': activation} + + conv_settings_dict3 = {'tower_1': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (3, 1), + 'activation': activation}, + 'tower_2': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (5, 1), + 'activation': activation}, + 'tower_3': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (1, 1), + 'activation': activation}, + } + + pool_settings_dict3 = {'pool_kernel': (3, 1), 'tower_filter': 32, 'activation': activation} + + ########################################## + inception_model = InceptionModelBase() + + X_input = keras.layers.Input( + shape=(self.window_history_size + 1, 1, self.channels)) # add 1 to window_size to include current time step t0 + + X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1, + regularizer=self.regularizer, + batch_normalisation=True) + + X_in = keras.layers.Dropout(self.dropout_rate)(X_in) + + X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=self.regularizer, + batch_normalisation=True) + + X_in = keras.layers.Dropout(self.dropout_rate)(X_in) + + X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=self.regularizer, + batch_normalisation=True) + ############################################# + + out_main = flatten_tail(X_in, 'Main', activation=activation, bound_weight=True, dropout_rate=self.dropout_rate, + reduction_filter=64, first_dense=64, window_lead_time=self.window_lead_time) + + self.model = keras.Model(inputs=X_input, outputs=[out_main]) + + def set_loss(self): + + """ + Set the loss + :return: loss function + """ + + self.loss = [keras.losses.mean_squared_error] diff --git a/src/plotting/postprocessing_plotting.py b/src/plotting/postprocessing_plotting.py index 97d326bc87c72142ab20ea95effbd88f490f2937..a41c636b5ab17d2039f7976fca625e9c8e11ce6e 100644 --- a/src/plotting/postprocessing_plotting.py +++ b/src/plotting/postprocessing_plotting.py @@ -1,26 +1,25 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-17' -import os import logging import math +import os import warnings -from src import helpers -from src.helpers import TimeTracking -from src.run_modules.run_environment import RunEnvironment +from typing import Dict, List, Tuple +import cartopy.crs as ccrs +import cartopy.feature as cfeature +import matplotlib +import matplotlib.pyplot as plt import numpy as np -import xarray as xr import pandas as pd - -import matplotlib import seaborn as sns -import matplotlib.pyplot as plt -import cartopy.crs as ccrs -import cartopy.feature as cfeature +import xarray as xr from matplotlib.backends.backend_pdf import PdfPages -from typing import Dict, List, Tuple +from src import helpers +from src.helpers import TimeTracking +from src.run_modules.run_environment import RunEnvironment logging.getLogger('matplotlib').setLevel(logging.WARNING) @@ -51,8 +50,8 @@ class PlotMonthlySummary(RunEnvironment): def _prepare_data(self, stations: List) -> xr.DataArray: """ - Pre-process data required to plot. For each station, load locally saved predictions, extract the CNN and orig - prediction and group them into monthly bins (no aggregation, only sorting them). + Pre-process data required to plot. For each station, load locally saved predictions, extract the CNN prediction + and the observation and group them into monthly bins (no aggregation, only sorting them). :param stations: all stations to plot :return: The entire data set, flagged with the corresponding month. """ @@ -63,12 +62,13 @@ class PlotMonthlySummary(RunEnvironment): data = xr.open_dataarray(file_name) data_cnn = data.sel(type="CNN").squeeze() - data_cnn.coords["ahead"].values = [f"{days}d" for days in data_cnn.coords["ahead"].values] + if len(data_cnn.shape) > 1: + data_cnn.coords["ahead"].values = [f"{days}d" for days in data_cnn.coords["ahead"].values] - data_orig = data.sel(type="orig", ahead=1).squeeze() - data_orig.coords["ahead"] = "orig" + data_obs = data.sel(type="obs", ahead=1).squeeze() + data_obs.coords["ahead"] = "obs" - data_concat = xr.concat([data_orig, data_cnn], dim="ahead") + data_concat = xr.concat([data_obs, data_cnn], dim="ahead") data_concat = data_concat.drop("type") data_concat.index.values = data_concat.index.values.astype("datetime64[M]").astype(int) % 12 + 1 @@ -189,7 +189,7 @@ class PlotStationMap(RunEnvironment): plt.close('all') -def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_window: int = 3, ref_name: str = 'orig', +def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_window: int = 3, ref_name: str = 'obs', pred_name: str = 'CNN', season: str = "", forecast_path: str = None, plot_name_affix: str = "", units: str = "ppb"): """ @@ -222,7 +222,7 @@ def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_w for station in stations: file = os.path.join(forecast_path, f"forecasts_{station}_test.nc") data_tmp = xr.open_dataarray(file) - data_collector.append(data_tmp.loc[:, :, ['CNN', 'orig', 'OLS']].assign_coords(station=station)) + data_collector.append(data_tmp.loc[:, :, ['CNN', 'obs', 'OLS']].assign_coords(station=station)) return xr.concat(data_collector, dim='station').transpose('index', 'type', 'ahead', 'station') def segment_data(data): @@ -252,7 +252,7 @@ def plot_conditional_quantiles(stations: list, plot_folder: str = ".", rolling_w def labels(plot_type, data_unit="ppb"): names = (f"forecast concentration (in {data_unit})", f"observed concentration (in {data_unit})") - if plot_type == "orig": + if plot_type == "obs": return names else: return names[::-1] @@ -477,3 +477,120 @@ class PlotCompetitiveSkillScore(RunEnvironment): logging.debug(f"... save plot to {plot_name}") plt.savefig(plot_name, dpi=500) plt.close() + + +class PlotTimeSeries(RunEnvironment): + + def __init__(self, stations: List, data_path: str, name: str, window_lead_time: int = None, plot_folder: str = ".", + sampling="daily"): + super().__init__() + self._data_path = data_path + self._data_name = name + self._stations = stations + self._window_lead_time = self._get_window_lead_time(window_lead_time) + self._sampling = self._get_sampling(sampling) + self._plot(plot_folder) + + @staticmethod + def _get_sampling(sampling): + if sampling == "daily": + return "D" + elif sampling == "hourly": + return "h" + + def _get_window_lead_time(self, window_lead_time: int): + """ + Extract the lead time from data and arguments. If window_lead_time is not given, extract this information from + data itself by the number of ahead dimensions. If given, check if data supports the give length. If the number + of ahead dimensions in data is lower than the given lead time, data's lead time is used. + :param window_lead_time: lead time from arguments to validate + :return: validated lead time, comes either from given argument or from data itself + """ + ahead_steps = len(self._load_data(self._stations[0]).ahead) + if window_lead_time is None: + window_lead_time = ahead_steps + return min(ahead_steps, window_lead_time) + + def _load_data(self, station): + logging.debug(f"... preprocess station {station}") + file_name = os.path.join(self._data_path, self._data_name % station) + data = xr.open_dataarray(file_name) + return data.sel(type=["CNN", "obs"]) + + def _plot(self, plot_folder): + pdf_pages = self._create_pdf_pages(plot_folder) + start, end = self._get_time_range(self._load_data(self._stations[0])) + for pos, station in enumerate(self._stations): + data = self._load_data(station) + fig, axes, factor = self._create_subplots(start, end) + nan_list = [] + for i_year in range(end - start + 1): + data_year = data.sel(index=f"{start + i_year}") + for i_half_of_year in range(factor): + pos = factor * i_year + i_half_of_year + plot_data = self._create_plot_data(data_year, factor, i_half_of_year) + self._plot_obs(axes[pos], plot_data) + self._plot_ahead(axes[pos], plot_data) + if np.isnan(plot_data.values).all(): + nan_list.append(pos) + self._clean_up_axes(nan_list, axes, fig) + self._save_page(station, pdf_pages) + pdf_pages.close() + plt.close('all') + + @staticmethod + def _clean_up_axes(nan_list, axes, fig): + for i in reversed(nan_list): + fig.delaxes(axes[i]) + + @staticmethod + def _save_page(station, pdf_pages): + plt.suptitle(station) + plt.legend() + plt.tight_layout() + pdf_pages.savefig(dpi=500) + + @staticmethod + def _create_plot_data(data, factor, running_index): + if factor > 1: + if running_index == 0: + data = data.where(data["index.month"] < 7) + else: + data = data.where(data["index.month"] >= 7) + return data + + def _create_subplots(self, start, end): + factor = 1 + if self._sampling == "h": + factor = 2 + f, ax = plt.subplots((end - start + 1) * factor, sharey=True, figsize=(50, 30)) + return f, ax, factor + + def _plot_ahead(self, ax, data): + color = sns.color_palette("Blues_d", self._window_lead_time).as_hex() + for ahead in data.coords["ahead"].values: + plot_data = data.sel(type="CNN", ahead=ahead).drop(["type", "ahead"]).squeeze() + index = plot_data.index + np.timedelta64(int(ahead), self._sampling) + label = f"{ahead}{self._sampling}" + ax.plot(index, plot_data.values, color=color[ahead-1], label=label) + + def _plot_obs(self, ax, data): + obs_data = data.sel(type="obs", ahead=1) + index = data.index + np.timedelta64(1, self._sampling) + ax.plot(index, obs_data.values, color=matplotlib.colors.cnames["green"], label="obs") + + @staticmethod + def _get_time_range(data): + def f(x, f_x): + return pd.to_datetime(f_x(x.index.values)).year + return f(data, min), f(data, max) + + @staticmethod + def _create_pdf_pages(plot_folder): + """ + Standard save method to store plot locally. The name of this plot is static. + :param plot_folder: path to save the plot + """ + plot_name = os.path.join(os.path.abspath(plot_folder), 'timeseries_plot.pdf') + logging.debug(f"... save plot to {plot_name}") + return matplotlib.backends.backend_pdf.PdfPages(plot_name) diff --git a/src/plotting/training_monitoring.py b/src/plotting/training_monitoring.py index 339ba63711cde8122f2879d66b66d490429e6d23..7e656895c5eecdabe1ef26869b68fb9494ed4c8c 100644 --- a/src/plotting/training_monitoring.py +++ b/src/plotting/training_monitoring.py @@ -5,9 +5,9 @@ __date__ = '2019-12-11' from typing import Union, Dict, List import keras -import pandas as pd import matplotlib import matplotlib.pyplot as plt +import pandas as pd from src.model_modules.keras_extensions import LearningRateDecay diff --git a/src/run_modules/experiment_setup.py b/src/run_modules/experiment_setup.py index 834d0c57e8c82b2eaee687a751318266c1515943..9c5d68688462ed33e91151dab685af5811cc3120 100644 --- a/src/run_modules/experiment_setup.py +++ b/src/run_modules/experiment_setup.py @@ -2,15 +2,14 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-11-15' -import logging import argparse -from typing import Union, Dict, Any +import logging import os +from typing import Union, Dict, Any from src import helpers from src.run_modules.run_environment import RunEnvironment - DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBY052', 'DEBY032', 'DEBW022', 'DEBY004', 'DEBY020', 'DEBW030', 'DEBW037', 'DEBW031', 'DEBW015', 'DEBW073', 'DEBY039', 'DEBW038', 'DEBW081', 'DEBY075', 'DEBW040', 'DEBY053', 'DEBW059', 'DEBW027', 'DEBY072', 'DEBW042', 'DEBW039', 'DEBY001', @@ -32,24 +31,28 @@ class ExperimentSetup(RunEnvironment): statistics_per_var=None, start=None, end=None, window_history_size=None, target_var="o3", target_dim=None, window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, - test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None, + test_end=None, use_all_stations_on_all_data_sets=True, trainable=None, fraction_of_train=None, experiment_path=None, plot_path=None, forecast_path=None, overwrite_local_data=None, sampling="daily", - bootstrap_path=None): + create_new_model=None, bootstrap_path=None): # create run framework super().__init__() # experiment setup - self._set_param("data_path", helpers.prepare_host()) + self._set_param("data_path", helpers.prepare_host(sampling=sampling)) + self._set_param("create_new_model", create_new_model, default=True) + if self.data_store.get("create_new_model", "general"): + trainable = True data_path = self.data_store.get("data_path", "general") bootstrap_path = helpers.set_bootstrap_path(bootstrap_path, data_path, sampling) self._set_param("bootstrap_path", bootstrap_path) - self._set_param("trainable", trainable, default=False) + self._set_param("trainable", trainable, default=True) self._set_param("fraction_of_training", fraction_of_train, default=0.8) # set experiment name exp_date = self._get_parser_args(parser_args).get("experiment_date") - exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date, experiment_path=experiment_path) + exp_name, exp_path = helpers.set_experiment_name(experiment_date=exp_date, experiment_path=experiment_path, + sampling=sampling) self._set_param("experiment_name", exp_name) self._set_param("experiment_path", exp_path) helpers.check_path_and_create(self.data_store.get("experiment_path", "general")) @@ -76,6 +79,7 @@ class ExperimentSetup(RunEnvironment): self._set_param("end", end, default="2017-12-31", scope="general") self._set_param("window_history_size", window_history_size, default=13) self._set_param("overwrite_local_data", overwrite_local_data, default=False, scope="general.preprocessing") + self._set_param("sampling", sampling) # target self._set_param("target_var", target_var, default="o3") @@ -88,19 +92,19 @@ class ExperimentSetup(RunEnvironment): self._set_param("interpolate_method", interpolate_method, default='linear') self._set_param("limit_nan_fill", limit_nan_fill, default=1) - # train parameters + # train set parameters self._set_param("start", train_start, default="1997-01-01", scope="general.train") self._set_param("end", train_end, default="2007-12-31", scope="general.train") - # validation parameters + # validation set parameters self._set_param("start", val_start, default="2008-01-01", scope="general.val") self._set_param("end", val_end, default="2009-12-31", scope="general.val") - # test parameters + # test set parameters self._set_param("start", test_start, default="2010-01-01", scope="general.test") self._set_param("end", test_end, default="2017-12-31", scope="general.test") - # train_val parameters + # train_val set parameters self._set_param("start", self.data_store.get("start", "general.train"), scope="general.train_val") self._set_param("end", self.data_store.get("end", "general.val"), scope="general.train_val") diff --git a/src/run_modules/model_setup.py b/src/run_modules/model_setup.py index 4a72189283ff1bedc3014b29cdb752fe244c84bf..e3945a542d60b09dc9855bd28be87cdba729ed72 100644 --- a/src/run_modules/model_setup.py +++ b/src/run_modules/model_setup.py @@ -2,19 +2,17 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-02' -import keras -from keras import losses -import tensorflow as tf import logging import os -from src.run_modules.run_environment import RunEnvironment -from src.helpers import l_p_loss -from src.model_modules.keras_extensions import HistoryAdvanced, ModelCheckpointAdvanced -from src.model_modules.inception_model import InceptionModelBase -from src.model_modules.flatten import flatten_tail +import keras +import tensorflow as tf + +from src.model_modules.keras_extensions import HistoryAdvanced, CallbackHandler # from src.model_modules.model_class import MyBranchedModel as MyModel -from src.model_modules.model_class import MyLittleModel as MyModel +# from src.model_modules.model_class import MyLittleModel as MyModel +from src.model_modules.model_class import MyTowerModel as MyModel +from src.run_modules.run_environment import RunEnvironment class ModelSetup(RunEnvironment): @@ -27,8 +25,12 @@ class ModelSetup(RunEnvironment): path = self.data_store.get("experiment_path", "general") exp_name = self.data_store.get("experiment_name", "general") self.scope = "general.model" - self.checkpoint_name = os.path.join(path, f"{exp_name}_model-best.h5") - self.callbacks_name = os.path.join(path, f"{exp_name}_model-best-callbacks-%s.pickle") + self.path = os.path.join(path, f"{exp_name}_%s") + self.model_name = self.path % "%s.h5" + self.checkpoint_name = self.path % "model-best.h5" + self.callbacks_name = self.path % "model-best-callbacks-%s.pickle" + self._trainable = self.data_store.get("trainable", "general") + self._create_new_model = self.data_store.get("create_new_model", "general") self._run() def _run(self): @@ -43,11 +45,11 @@ class ModelSetup(RunEnvironment): self.plot_model() # load weights if no training shall be performed - if self.data_store.get("trainable", self.scope) is False: + if not self._trainable and not self._create_new_model: self.load_weights() # create checkpoint - self._set_checkpoint() + self._set_callbacks() # compile model self.compile_model() @@ -62,24 +64,25 @@ class ModelSetup(RunEnvironment): self.model.compile(optimizer=optimizer, loss=loss, metrics=["mse", "mae"]) self.data_store.set("model", self.model, self.scope) - def _set_checkpoint(self): + def _set_callbacks(self): """ - Must be run after all callback functions that shall be tracked during training have been created (currently this - affects the learning rate decay and the advanced history [actually created in this method]). + Set all callbacks for the training phase. Add all callbacks with the .add_callback statement. Finally, the + advanced model checkpoint is added. """ lr = self.data_store.get("lr_decay", scope="general.model") hist = HistoryAdvanced() self.data_store.set("hist", hist, scope="general.model") - callbacks = [{"callback": lr, "path": self.callbacks_name % "lr"}, - {"callback": hist, "path": self.callbacks_name % "hist"}] - checkpoint = ModelCheckpointAdvanced(filepath=self.checkpoint_name, verbose=1, monitor='val_loss', - save_best_only=True, mode='auto', callbacks=callbacks) - self.data_store.set("checkpoint", checkpoint, self.scope) + callbacks = CallbackHandler() + callbacks.add_callback(lr, self.callbacks_name % "lr", "lr") + callbacks.add_callback(hist, self.callbacks_name % "hist", "hist") + callbacks.create_model_checkpoint(filepath=self.checkpoint_name, verbose=1, monitor='val_loss', + save_best_only=True, mode='auto') + self.data_store.set("callbacks", callbacks, self.scope) def load_weights(self): try: - self.model.load_weights(self.checkpoint_name) - logging.info('reload weights...') + self.model.load_weights(self.model_name) + logging.info(f"reload weights from model {self.model_name} ...") except OSError: logging.info('no weights to reload...') @@ -92,97 +95,10 @@ class ModelSetup(RunEnvironment): def get_model_settings(self): model_settings = self.model.get_settings() self.data_store.set_args_from_dict(model_settings, self.scope) + self.model_name = self.model_name % self.data_store.get_default("model_name", self.scope, "my_model") + self.data_store.set("model_name", self.model_name, self.scope) def plot_model(self): # pragma: no cover with tf.device("/cpu:0"): - path = self.data_store.get("experiment_path", "general") - name = self.data_store.get("experiment_name", "general") + "_model.pdf" - file_name = os.path.join(path, name) + file_name = f"{self.model_name.split(sep='.')[0]}.pdf" keras.utils.plot_model(self.model, to_file=file_name, show_shapes=True, show_layer_names=True) - - -def my_loss(): - loss = l_p_loss(4) - keras_loss = losses.mean_squared_error - loss_all = [loss] + [keras_loss] - return loss_all - - -def my_little_loss(): - return losses.mean_squared_error - - -def my_little_model(activation, window_history_size, channels, regularizer, dropout_rate, window_lead_time): - - X_input = keras.layers.Input( - shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0 - X_in = keras.layers.Conv2D(32, (1, 1), padding='same', name='{}_Conv_1x1'.format("major"))(X_input) - X_in = activation(name='{}_conv_act'.format("major"))(X_in) - X_in = keras.layers.Flatten(name='{}'.format("major"))(X_in) - X_in = keras.layers.Dropout(dropout_rate, name='{}_Dropout_1'.format("major"))(X_in) - X_in = keras.layers.Dense(64, name='{}_Dense_64'.format("major"))(X_in) - X_in = activation()(X_in) - X_in = keras.layers.Dense(32, name='{}_Dense_32'.format("major"))(X_in) - X_in = activation()(X_in) - X_in = keras.layers.Dense(16, name='{}_Dense_16'.format("major"))(X_in) - X_in = activation()(X_in) - X_in = keras.layers.Dense(window_lead_time, name='{}_Dense'.format("major"))(X_in) - out_main = activation()(X_in) - return keras.Model(inputs=X_input, outputs=[out_main]) - - -def my_model(activation, window_history_size, channels, regularizer, dropout_rate, window_lead_time): - - conv_settings_dict1 = { - 'tower_1': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (3, 1), 'activation': activation}, - 'tower_2': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (5, 1), 'activation': activation}, - 'tower_3': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (1, 1), 'activation': activation}, - } - - pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 8 * 2, 'activation': activation} - - conv_settings_dict2 = {'tower_1': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (3, 1), - 'activation': activation}, - 'tower_2': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (5, 1), - 'activation': activation}, - 'tower_3': {'reduction_filter': 8 * 2, 'tower_filter': 16 * 2 * 2, 'tower_kernel': (1, 1), - 'activation': activation}, - } - pool_settings_dict2 = {'pool_kernel': (3, 1), 'tower_filter': 16, 'activation': activation} - - conv_settings_dict3 = {'tower_1': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (3, 1), - 'activation': activation}, - 'tower_2': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (5, 1), - 'activation': activation}, - 'tower_3': {'reduction_filter': 16 * 4, 'tower_filter': 32 * 2, 'tower_kernel': (1, 1), - 'activation': activation}, - } - - pool_settings_dict3 = {'pool_kernel': (3, 1), 'tower_filter': 32, 'activation': activation} - - ########################################## - inception_model = InceptionModelBase() - - X_input = keras.layers.Input(shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0 - - X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1, regularizer=regularizer, - batch_normalisation=True) - - out_minor = flatten_tail(X_in, 'Minor_1', bound_weight=True, activation=activation, dropout_rate=dropout_rate, - reduction_filter=4, first_dense=32, window_lead_time=window_lead_time) - - X_in = keras.layers.Dropout(dropout_rate)(X_in) - - X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=regularizer, - batch_normalisation=True) - - X_in = keras.layers.Dropout(dropout_rate)(X_in) - - X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=regularizer, - batch_normalisation=True) - ############################################# - - out_main = flatten_tail(X_in, 'Main', activation=activation, bound_weight=True, dropout_rate=dropout_rate, - reduction_filter=64, first_dense=64, window_lead_time=window_lead_time) - - return keras.Model(inputs=X_input, outputs=[out_minor, out_main]) diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index e6f271ce3cc6cf2548ff5b06ba40e2fd509f8c8d..06203c879872891f57c719040482fe052824c65e 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -5,20 +5,21 @@ __date__ = '2019-12-11' import logging import os +import keras import numpy as np import pandas as pd import xarray as xr -import keras -from src.run_modules.run_environment import RunEnvironment +from src import statistics from src.data_handling.data_distributor import Distributor from src.data_handling.data_generator import DataGenerator -from src.model_modules.linear_model import OrdinaryLeastSquaredModel -from src import statistics -from src.plotting.postprocessing_plotting import plot_conditional_quantiles -from src.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStationMap, PlotClimatologicalSkillScore, PlotCompetitiveSkillScore from src.datastore import NameNotFoundInDataStore from src.helpers import TimeTracking +from src.model_modules.linear_model import OrdinaryLeastSquaredModel +from src.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStationMap, PlotClimatologicalSkillScore, \ + PlotCompetitiveSkillScore, PlotTimeSeries +from src.plotting.postprocessing_plotting import plot_conditional_quantiles +from src.run_modules.run_environment import RunEnvironment class PostProcessing(RunEnvironment): @@ -33,6 +34,8 @@ class PostProcessing(RunEnvironment): self.train_data: DataGenerator = self.data_store.get("generator", "general.train") self.train_val_data: DataGenerator = self.data_store.get("generator", "general.train_val") self.plot_path: str = self.data_store.get("plot_path", "general") + self.target_var = self.data_store.get("target_var", "general") + self._sampling = self.data_store.get("sampling", "general") self.skill_scores = None self._run() @@ -40,9 +43,9 @@ class PostProcessing(RunEnvironment): with TimeTracking(): self.train_ols_model() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " - "skip make_prediction() whenever it is possible to save time.") + "skip train_ols_model() whenever it is possible to save time.") with TimeTracking(): - preds_for_all_stations = self.make_prediction() + self.make_prediction() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip make_prediction() whenever it is possible to save time.") self.skill_scores = self.calculate_skill_scores() @@ -52,29 +55,27 @@ class PostProcessing(RunEnvironment): try: model = self.data_store.get("best_model", "general") except NameNotFoundInDataStore: - logging.info("no model saved in data store. trying to load model from experiment") - path = self.data_store.get("experiment_path", "general") - name = f"{self.data_store.get('experiment_name', 'general')}_my_model.h5" - model_name = os.path.join(path, name) + logging.info("no model saved in data store. trying to load model from experiment path") + model_name = self.data_store.get("model_name", "general.model") model = keras.models.load_model(model_name) return model def plot(self): logging.debug("Run plotting routines...") path = self.data_store.get("forecast_path", "general") - target_var = self.data_store.get("target_var", "general") - plot_conditional_quantiles(self.test_data.stations, pred_name="CNN", ref_name="orig", + plot_conditional_quantiles(self.test_data.stations, pred_name="CNN", ref_name="obs", forecast_path=path, plot_name_affix="cali-ref", plot_folder=self.plot_path) - plot_conditional_quantiles(self.test_data.stations, pred_name="orig", ref_name="CNN", + plot_conditional_quantiles(self.test_data.stations, pred_name="obs", ref_name="CNN", forecast_path=path, plot_name_affix="like-bas", plot_folder=self.plot_path) PlotStationMap(generators={'b': self.test_data}, plot_folder=self.plot_path) - PlotMonthlySummary(self.test_data.stations, path, r"forecasts_%s_test.nc", target_var, + PlotMonthlySummary(self.test_data.stations, path, r"forecasts_%s_test.nc", self.target_var, plot_folder=self.plot_path) PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, model_setup="CNN") PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, score_only=False, extra_name_tag="all_terms_", model_setup="CNN") PlotCompetitiveSkillScore(self.skill_scores[0], plot_folder=self.plot_path, model_setup="CNN") + PlotTimeSeries(self.test_data.stations, path, r"forecasts_%s_test.nc", plot_folder=self.plot_path, sampling=self._sampling) def calculate_test_score(self): test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(), @@ -91,17 +92,16 @@ class PostProcessing(RunEnvironment): def train_ols_model(self): self.ols_model = OrdinaryLeastSquaredModel(self.train_data) - def make_prediction(self, freq="1D"): + def make_prediction(self): logging.debug("start make_prediction") - nn_prediction_all_stations = [] - for i, v in enumerate(self.test_data): + for i, _ in enumerate(self.test_data): data = self.test_data.get_data_generator(i) nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3) - input_data = self.test_data[i][0] + input_data = data.get_transposed_history() # get scaling parameters - mean, std, transformation_method = data.get_transformation_information(variable='o3') + mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) # nn forecast nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method) @@ -113,15 +113,15 @@ class PostProcessing(RunEnvironment): # ols ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method) - # orig pred - orig_pred = self._create_orig_forecast(data, None, mean, std, transformation_method) + # observation + observation = self._create_observation(data, None, mean, std, transformation_method) # merge all predictions - full_index = self.create_fullindex(data.data.indexes['datetime'], freq) + full_index = self.create_fullindex(data.data.indexes['datetime'], self._get_frequency()) all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']), CNN=nn_prediction, persi=persistence_prediction, - orig=orig_pred, + obs=observation, OLS=ols_prediction) # save all forecasts locally @@ -129,22 +129,24 @@ class PostProcessing(RunEnvironment): file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc") all_predictions.to_netcdf(file) - # save nn forecast to return variable - nn_prediction_all_stations.append(nn_prediction) - return nn_prediction_all_stations + def _get_frequency(self): + getter = {"daily": "1D", "hourly": "1H"} + return getter.get(self._sampling, None) @staticmethod - def _create_orig_forecast(data, _, mean, std, transformation_method): + def _create_observation(data, _, mean, std, transformation_method): return statistics.apply_inverse_transformation(data.label.copy(), mean, std, transformation_method) def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method): tmp_ols = self.ols_model.predict(input_data) tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) - ols_prediction.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0) + tmp_ols = np.expand_dims(tmp_ols, axis=1) + target_shape = ols_prediction.values.shape + ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols return ols_prediction def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method): - tmp_persi = input_data.sel({'window': 0, 'variables': 'o3'}) + tmp_persi = input_data.sel({'window': 0, 'variables': self.target_var}) tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) window_lead_time = self.data_store.get("window_lead_time", "general") persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)), @@ -224,8 +226,8 @@ class PostProcessing(RunEnvironment): def _get_external_data(self, station): try: data = self.train_val_data.get_data_generator(station) - mean, std, transformation_method = data.get_transformation_information(variable='o3') - external_data = self._create_orig_forecast(data, None, mean, std, transformation_method) + mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) + external_data = self._create_observation(data, None, mean, std, transformation_method) external_data = external_data.squeeze("Stations").sel(window=1).drop(["window", "Stations", "variables"]) return external_data.rename({'datetime': 'index'}) except KeyError: diff --git a/src/run_modules/pre_processing.py b/src/run_modules/pre_processing.py index 5dc61738c37240326579e60d487ad4423302682e..4660a8116b6d0b860a7d0d50b92cee5e0deb77d8 100644 --- a/src/run_modules/pre_processing.py +++ b/src/run_modules/pre_processing.py @@ -7,13 +7,12 @@ from typing import Tuple, Dict, List from src.data_handling.data_generator import DataGenerator from src.helpers import TimeTracking -from src.run_modules.run_environment import RunEnvironment from src.join import EmptyQueryResult - +from src.run_modules.run_environment import RunEnvironment DEFAULT_ARGS_LIST = ["data_path", "network", "stations", "variables", "interpolate_dim", "target_dim", "target_var"] DEFAULT_KWARGS_LIST = ["limit_nan_fill", "window_history_size", "window_lead_time", "statistics_per_var", - "station_type", "overwrite_local_data", "start", "end"] + "station_type", "overwrite_local_data", "start", "end", "sampling"] class PreProcessing(RunEnvironment): diff --git a/src/run_modules/training.py b/src/run_modules/training.py index 99afd8300ca28fec3a589e69fa5b4eff1a37914a..7a522af0298bcabee62579f68bd29ed123cac7b0 100644 --- a/src/run_modules/training.py +++ b/src/run_modules/training.py @@ -1,16 +1,17 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-05' +import json import logging import os -import json -import keras import pickle -from src.run_modules.run_environment import RunEnvironment +import keras + from src.data_handling.data_distributor import Distributor +from src.model_modules.keras_extensions import LearningRateDecay, ModelCheckpointAdvanced, CallbackHandler from src.plotting.training_monitoring import PlotModelHistory, PlotModelLearningRate -from src.model_modules.keras_extensions import LearningRateDecay, ModelCheckpointAdvanced +from src.run_modules.run_environment import RunEnvironment class Training(RunEnvironment): @@ -23,10 +24,10 @@ class Training(RunEnvironment): self.test_set = None self.batch_size = self.data_store.get("batch_size", "general.model") self.epochs = self.data_store.get("epochs", "general.model") - self.checkpoint: ModelCheckpointAdvanced = self.data_store.get("checkpoint", "general.model") - self.lr_sc = self.data_store.get("lr_decay", "general.model") - self.hist = self.data_store.get("hist", "general.model") + self.callbacks: CallbackHandler = self.data_store.get("callbacks", "general.model") self.experiment_name = self.data_store.get("experiment_name", "general") + self._trainable = self.data_store.get("trainable", "general") + self._create_new_model = self.data_store.get("create_new_model", "general") self._run() def _run(self) -> None: @@ -43,8 +44,11 @@ class Training(RunEnvironment): """ self.set_generators() self.make_predict_function() - self.train() - self.save_model() + if self._trainable: + self.train() + self.save_model() + else: + logging.info("No training has started, because trainable parameter was false.") def make_predict_function(self) -> None: """ @@ -81,46 +85,41 @@ class Training(RunEnvironment): locally stored information and the corresponding model and proceed with the already started training. """ logging.info(f"Train with {len(self.train_set)} mini batches.") - if not os.path.exists(self.checkpoint.filepath): + checkpoint = self.callbacks.get_checkpoint() + if not os.path.exists(checkpoint.filepath) or self._create_new_model: history = self.model.fit_generator(generator=self.train_set.distribute_on_batches(), steps_per_epoch=len(self.train_set), epochs=self.epochs, verbose=2, validation_data=self.val_set.distribute_on_batches(), validation_steps=len(self.val_set), - callbacks=[self.lr_sc, self.hist, self.checkpoint]) + callbacks=self.callbacks.get_callbacks(as_dict=False)) else: logging.info("Found locally stored model and checkpoints. Training is resumed from the last checkpoint.") - lr_filepath = self.checkpoint.callbacks[0]["path"] - hist_filepath = self.checkpoint.callbacks[1]["path"] - self.lr_sc = pickle.load(open(lr_filepath, "rb")) - self.hist = pickle.load(open(hist_filepath, "rb")) - self.model = keras.models.load_model(self.checkpoint.filepath) - initial_epoch = max(self.hist.epoch) + 1 - callbacks = [{"callback": self.lr_sc, "path": lr_filepath}, - {"callback": self.hist, "path": hist_filepath}] - self.checkpoint.update_callbacks(callbacks) - self.checkpoint.update_best(self.hist) + self.callbacks.load_callbacks() + self.callbacks.update_checkpoint() + self.model = keras.models.load_model(checkpoint.filepath) + hist = self.callbacks.get_callback_by_name("hist") + initial_epoch = max(hist.epoch) + 1 _ = self.model.fit_generator(generator=self.train_set.distribute_on_batches(), steps_per_epoch=len(self.train_set), epochs=self.epochs, verbose=2, validation_data=self.val_set.distribute_on_batches(), validation_steps=len(self.val_set), - callbacks=[self.lr_sc, self.hist, self.checkpoint], + callbacks=self.callbacks.get_callbacks(as_dict=False), initial_epoch=initial_epoch) - history = self.hist - self.save_callbacks_as_json(history) - self.load_best_model(self.checkpoint.filepath) - self.create_monitoring_plots(history, self.lr_sc) + history = hist + lr = self.callbacks.get_callback_by_name("lr") + self.save_callbacks_as_json(history, lr) + self.load_best_model(checkpoint.filepath) + self.create_monitoring_plots(history, lr) def save_model(self) -> None: """ - save model in local experiment directory. Model is named as <experiment_name>_my_model.h5 . + save model in local experiment directory. Model is named as <experiment_name>_<custom_model_name>.h5 . """ - path = self.data_store.get("experiment_path", "general") - name = f"{self.data_store.get('experiment_name', 'general')}_my_model.h5" - model_name = os.path.join(path, name) + model_name = self.data_store.get("model_name", "general.model") logging.debug(f"save best model to {model_name}") self.model.save(model_name) self.data_store.set("best_model", self.model, "general") @@ -137,7 +136,7 @@ class Training(RunEnvironment): except OSError: logging.info('no weights to reload...') - def save_callbacks_as_json(self, history: keras.callbacks.History) -> None: + def save_callbacks_as_json(self, history: keras.callbacks.History, lr_sc: keras.callbacks) -> None: """ Save callbacks (history, learning rate) of training. * history.history -> history.json @@ -149,7 +148,7 @@ class Training(RunEnvironment): with open(os.path.join(path, "history.json"), "w") as f: json.dump(history.history, f) with open(os.path.join(path, "history_lr.json"), "w") as f: - json.dump(self.lr_sc.lr, f) + json.dump(lr_sc.lr, f) def create_monitoring_plots(self, history: keras.callbacks.History, lr_sc: LearningRateDecay) -> None: """ diff --git a/src/statistics.py b/src/statistics.py index df73784df830d5f7b96bf0fcd18a65d362516f12..e3481d0e0f0561ac8a903648a69e92c6d6acc40d 100644 --- a/src/statistics.py +++ b/src/statistics.py @@ -126,12 +126,12 @@ class SkillScores(RunEnvironment): return skill_score - def _climatological_skill_score(self, data, mu_type=1, observation_name="orig", forecast_name="CNN", external_data=None): + def _climatological_skill_score(self, data, mu_type=1, observation_name="obs", forecast_name="CNN", external_data=None): kwargs = {"external_data": external_data} if external_data is not None else {} return self.__getattribute__(f"skill_score_mu_case_{mu_type}")(data, observation_name, forecast_name, **kwargs) @staticmethod - def general_skill_score(data, observation_name="orig", forecast_name="CNN", reference_name="persi"): + def general_skill_score(data, observation_name="obs", forecast_name="CNN", reference_name="persi"): data = data.dropna("index") observation = data.sel(type=observation_name) forecast = data.sel(type=forecast_name) @@ -159,12 +159,12 @@ class SkillScores(RunEnvironment): suffix = {"mean": mean, "sigma": sigma, "r": r, "p": p} return AI, BI, CI, data, suffix - def skill_score_mu_case_1(self, data, observation_name="orig", forecast_name="CNN"): + def skill_score_mu_case_1(self, data, observation_name="obs", forecast_name="CNN"): AI, BI, CI, data, _ = self.skill_score_pre_calculations(data, observation_name, forecast_name) skill_score = np.array(AI - BI - CI) return pd.DataFrame({"skill_score": [skill_score], "AI": [AI], "BI": [BI], "CI": [CI]}).to_xarray().to_array() - def skill_score_mu_case_2(self, data, observation_name="orig", forecast_name="CNN"): + def skill_score_mu_case_2(self, data, observation_name="obs", forecast_name="CNN"): AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) monthly_mean = self.create_monthly_mean_from_daily_data(data) data = xr.concat([data, monthly_mean], dim="type") @@ -177,14 +177,14 @@ class SkillScores(RunEnvironment): skill_score = np.array((AI - BI - CI - AII + BII) / (1 - AII + BII)) return pd.DataFrame({"skill_score": [skill_score], "AII": [AII], "BII": [BII]}).to_xarray().to_array() - def skill_score_mu_case_3(self, data, observation_name="orig", forecast_name="CNN", external_data=None): + def skill_score_mu_case_3(self, data, observation_name="obs", forecast_name="CNN", external_data=None): AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) mean, sigma = suffix["mean"], suffix["sigma"] AIII = (((external_data.mean().values - mean.loc[observation_name]) / sigma.loc[observation_name])**2).values skill_score = np.array((AI - BI - CI + AIII) / 1 + AIII) return pd.DataFrame({"skill_score": [skill_score], "AIII": [AIII]}).to_xarray().to_array() - def skill_score_mu_case_4(self, data, observation_name="orig", forecast_name="CNN", external_data=None): + def skill_score_mu_case_4(self, data, observation_name="obs", forecast_name="CNN", external_data=None): AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values, index=data.index) data = xr.concat([data, monthly_mean_external], dim="type") diff --git a/test/test_data_handling/test_data_distributor.py b/test/test_data_handling/test_data_distributor.py index cb51f20c8771ec49116731f02c7b462a62405394..4c6dbb1c38f2e4a49e53883fbe3cb33cb565118a 100644 --- a/test/test_data_handling/test_data_distributor.py +++ b/test/test_data_handling/test_data_distributor.py @@ -1,6 +1,5 @@ import math import os -import shutil import keras import numpy as np diff --git a/test/test_data_handling/test_data_generator.py b/test/test_data_handling/test_data_generator.py index 34cc60d7b6a9ccbd4d30463f185ce5cf6eff6f15..142acd166604951352ad6686548c2cb76f609ce0 100644 --- a/test/test_data_handling/test_data_generator.py +++ b/test/test_data_handling/test_data_generator.py @@ -1,5 +1,7 @@ -import pytest import os + +import pytest + import shutil import numpy as np import pickle @@ -18,7 +20,7 @@ class TestDataGenerator: @pytest.fixture def gen(self): return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], - 'datetime', 'variables', 'o3') + 'datetime', 'variables', 'o3', start=2010, end=2014) class DummyDataPrep: def __init__(self, data): @@ -40,13 +42,14 @@ class TestDataGenerator: assert gen.window_history_size == 7 assert gen.window_lead_time == 4 assert gen.transform_method == "standardise" - assert gen.kwargs == {} + assert gen.kwargs == {"start": 2010, "end": 2014} def test_repr(self, gen): path = os.path.join(os.path.dirname(__file__), 'data') assert gen.__repr__().rstrip() == f"DataGenerator(path='{path}', network='AIRBASE', stations=['DEBW107'], "\ f"variables=['o3', 'temp'], station_type=None, interpolate_dim='datetime', " \ - f"target_dim='variables', target_var='o3', **{{}})".rstrip() + f"target_dim='variables', target_var='o3', **{{'start': 2010, 'end': 2014}})"\ + .rstrip() def test_len(self, gen): assert len(gen) == 1 @@ -97,7 +100,7 @@ class TestDataGenerator: def test_get_data_generator(self, gen): gen.kwargs = {"statistics_per_var": {'o3': 'dma8eu', 'temp': 'maximum'}} - file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}.pickle") + file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}_None_None_.pickle") if os.path.exists(file): os.remove(file) assert not os.path.exists(file) @@ -111,7 +114,7 @@ class TestDataGenerator: assert os.stat(file).st_ctime > t def test_save_pickle_data(self, gen): - file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}.pickle") + file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}_2010_2014_.pickle") if os.path.exists(file): os.remove(file) assert not os.path.exists(file) @@ -121,7 +124,7 @@ class TestDataGenerator: os.remove(file) def test_load_pickle_data(self, gen): - file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}.pickle") + file = os.path.join(gen.data_path_tmp, f"DEBW107_{'_'.join(sorted(gen.variables))}_2010_2014_.pickle") data = self.DummyDataPrep(np.ones((10, 2))) with open(file, "wb") as f: pickle.dump(data, f) diff --git a/test/test_data_handling/test_data_preparation.py b/test/test_data_handling/test_data_preparation.py index d67b8addc8a2a4ab524c97f6ee6e25c80a7ba1ea..72bacaf9cc1e5a9dc9736b8e8eb7161f35d8ea69 100644 --- a/test/test_data_handling/test_data_preparation.py +++ b/test/test_data_handling/test_data_preparation.py @@ -1,14 +1,15 @@ -import pytest -import os -from src.data_handling.data_preparation import DataPrep -from src.join import EmptyQueryResult -import numpy as np -import xarray as xr import datetime as dt -import pandas as pd +import os from operator import itemgetter import logging -from src.helpers import PyTestRegex + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from src.data_handling.data_preparation import DataPrep +from src.join import EmptyQueryResult class TestDataPrep: @@ -27,6 +28,7 @@ class TestDataPrep: d.station = ['DEBW107'] d.variables = ['o3', 'temp'] d.station_type = "background" + d.sampling = "daily" d.kwargs = None return d diff --git a/test/test_datastore.py b/test/test_datastore.py index e7510cffacafb4a6db6006097f48992fb6a10e55..95a58deafc915dd6193960e77bb99cc8ab8d85cb 100644 --- a/test/test_datastore.py +++ b/test/test_datastore.py @@ -2,9 +2,10 @@ __author__ = 'Lukas Leufen' __date__ = '2019-11-22' +import pytest + from src.datastore import AbstractDataStore, DataStoreByVariable, DataStoreByScope from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope, EmptyScope -import pytest class TestAbstractDataStore: diff --git a/test/test_helpers.py b/test/test_helpers.py index 463007dc361d21df934bb239b3ecac2fc86882ad..b807d2b8612b9ee006bff43f1ae4cfcfd2dd07e1 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -1,13 +1,13 @@ -import pytest -from src.helpers import * import logging import os +import platform + import keras -import numpy as np import mock -import platform +import numpy as np +import pytest -from src.model_modules.keras_extensions import LearningRateDecay +from src.helpers import * class TestToList: diff --git a/test/test_join.py b/test/test_join.py index 865ae80dfaaa0244eb7592e65ef134a23b36634c..fe3d33d6296c16bfc72675bc1737aad12ee3c8b9 100644 --- a/test/test_join.py +++ b/test/test_join.py @@ -1,15 +1,18 @@ from typing import Iterable -import datetime as dt + import pytest from src.join import * from src.join import _save_to_pandas, _correct_stat_name, _lower_list +from src.join_settings import join_settings class TestJoinUrlBase: def test_url(self): - assert join_url_base == 'https://join.fz-juelich.de/services/rest/surfacedata/' + url, headers = join_settings() + assert url == 'https://join.fz-juelich.de/services/rest/surfacedata/' + assert headers == {} class TestDownloadJoin: @@ -25,22 +28,35 @@ class TestDownloadJoin: assert e.value.args[-1] == "No data found in JOIN." +class TestCorrectDataFormat: + + def test_correct_data_format(self): + list_data = [["2020-01-01 06:00:01", 23.], ["2020-01-01 06:00:11", 24.], ["2020-01-01 06:00:21", 25.], + ["2020-01-01 06:00:31", 26.], ["2020-01-01 06:00:41", 27.], ["2020-01-01 06:00:51", 23.], + {"station": "test_station_001", "author": "ME", "success": True}] + dict_data = correct_data_format(list_data) + assert dict_data == {"datetime": ["2020-01-01 06:00:01", "2020-01-01 06:00:11", "2020-01-01 06:00:21", + "2020-01-01 06:00:31", "2020-01-01 06:00:41", "2020-01-01 06:00:51"], + "values": [23., 24., 25., 26., 27., 23.], + "metadata": {"station": "test_station_001", "author": "ME", "success": True}} + + class TestGetData: def test(self): - opts = {"base": join_url_base, "service": "series", "station_id": 'DEBW107', "network_name": "UBA", + opts = {"base": join_settings()[0], "service": "series", "station_id": 'DEBW107', "network_name": "UBA", "parameter_name": "o3,no2"} - assert get_data(opts) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] + assert get_data(opts, headers={}) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] class TestLoadSeriesInformation: def test_standard_query(self): expected_subset = {'o3': 23031, 'no2': 39002, 'temp--lubw': 17059, 'wspeed': 17060} - assert expected_subset.items() <= load_series_information(['DEBW107'], None, None).items() + assert expected_subset.items() <= load_series_information(['DEBW107'], None, None, join_settings()[0], {}).items() def test_empty_result(self): - assert load_series_information(['DEBW107'], "traffic", None) == {} + assert load_series_information(['DEBW107'], "traffic", None, join_settings()[0], {}) == {} class TestSaveToPandas: @@ -53,6 +69,10 @@ class TestSaveToPandas: def date(self): return ['1997-01-01 00:00', '1997-01-02 00:00', '1997-01-03 00:00', '1997-01-04 00:00'] + @pytest.fixture + def date_len19(self): + return ['1997-01-01 00:00:00', '1997-01-02 00:00:00', '1997-01-03 00:00:00', '1997-01-04 00:00:00'] + @pytest.fixture def values(self): return [86.21, 94.76, 76.96, 99.89] @@ -75,6 +95,10 @@ class TestSaveToPandas: df_concat = pd.concat([create_df, next_df], axis=1) assert pd.testing.assert_frame_equal(df_concat, _save_to_pandas(create_df, data, 'max', 'temperature')) is None + def test_alternative_date_format(self, date_len19, values, create_df): + data = {'datetime': date_len19, 'mean': values, 'metadata': None} + assert pd.testing.assert_frame_equal(create_df, _save_to_pandas(None, data, 'mean', 'cloudcover')) is None + class TestCorrectStatName: diff --git a/test/test_model_modules/test_inception_model.py b/test/test_model_modules/test_inception_model.py index b2a923c47c4351ed4c2f543a4e30d25fdbaa58ea..aa5cb284ab196d733e04a9882fa4d5a4ef639a6d 100644 --- a/test/test_model_modules/test_inception_model.py +++ b/test/test_model_modules/test_inception_model.py @@ -1,6 +1,7 @@ +import keras import pytest + from src.model_modules.inception_model import InceptionModelBase -import keras class TestInceptionModelBase: diff --git a/test/test_model_modules/test_keras_extensions.py b/test/test_model_modules/test_keras_extensions.py index 7c32844d54e88f61690e65885b8997e98a698ff5..17ab4f6d65c95a5a54c9d931818f889acadef532 100644 --- a/test/test_model_modules/test_keras_extensions.py +++ b/test/test_model_modules/test_keras_extensions.py @@ -1,8 +1,11 @@ -import pytest -from src.model_modules.keras_extensions import * -from src.helpers import l_p_loss import keras import numpy as np +import pytest +import mock +import os + +from src.helpers import l_p_loss +from src.model_modules.keras_extensions import * class TestHistoryAdvanced: @@ -59,3 +62,172 @@ class TestLearningRateDecay: model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=5, callbacks=[lr_decay]) assert lr_decay.lr['lr'] == [0.02, 0.02, 0.02 * 0.95, 0.02 * 0.95, 0.02 * 0.95 * 0.95] + + +class TestModelCheckpointAdvanced: + + @pytest.fixture() + def callbacks(self): + callbacks_name = os.path.join(os.path.dirname(__file__), "callback_%s") + return [{"callback": LearningRateDecay(), "path": callbacks_name % "lr"}, + {"callback": HistoryAdvanced(), "path": callbacks_name % "hist"}] + + @pytest.fixture + def ckpt(self, callbacks): + ckpt_name = "ckpt.test" + return ModelCheckpointAdvanced(filepath=ckpt_name, monitor='val_loss', save_best_only=True, callbacks=callbacks, verbose=1) + + def test_init(self, ckpt, callbacks): + assert ckpt.callbacks == callbacks + assert ckpt.monitor == "val_loss" + assert ckpt.save_best_only is True + assert ckpt.best == np.inf + + def test_update_best(self, ckpt): + hist = HistoryAdvanced() + hist.history["val_loss"] = [10, 6] + ckpt.update_best(hist) + assert ckpt.best == 6 + + def test_update_callbacks(self, ckpt, callbacks): + ckpt.update_callbacks(callbacks[0]) + assert ckpt.callbacks == [callbacks[0]] + + def test_on_epoch_end(self, ckpt): + path = os.path.dirname(__file__) + ckpt.set_model(mock.MagicMock()) + ckpt.best = 6 + ckpt.on_epoch_end(0, {"val_loss": 6}) + assert "callback_hist" not in os.listdir(path) + ckpt.on_epoch_end(9, {"val_loss": 10}) + assert "callback_hist" not in os.listdir(path) + ckpt.on_epoch_end(10, {"val_loss": 4}) + assert "callback_hist" in os.listdir(path) + os.remove(os.path.join(path, "callback_hist")) + os.remove(os.path.join(path, "callback_lr")) + ckpt.save_best_only = False + ckpt.on_epoch_end(10, {"val_loss": 3}) + assert "callback_hist" in os.listdir(path) + os.remove(os.path.join(path, "callback_hist")) + os.remove(os.path.join(path, "callback_lr")) + + +class TestCallbackHandler: + + @pytest.fixture + def clbk_handler(self): + return CallbackHandler() + + @pytest.fixture + def clbk_handler_with_dummies(self, clbk_handler): + clbk_handler.add_callback("callback_new_instance", "this_path") + clbk_handler.add_callback("callback_other", "otherpath", "other_clbk") + return clbk_handler + + @pytest.fixture + def callback_handler(self, clbk_handler): + clbk_handler.add_callback(HistoryAdvanced(), "callbacks_hist.pickle", "hist") + clbk_handler.add_callback(LearningRateDecay(), "callbacks_lr.pickle", "lr") + return clbk_handler + + @pytest.fixture + def prepare_pickle_files(self): + hist = HistoryAdvanced() + hist.epoch = [1, 2, 3] + hist.history = {"val_loss": [10, 5, 4]} + lr = LearningRateDecay() + lr.epoch = [1, 2, 3] + pickle.dump(hist, open("callbacks_hist.pickle", "wb")) + pickle.dump(lr, open("callbacks_lr.pickle", "wb")) + yield + os.remove("callbacks_hist.pickle") + os.remove("callbacks_lr.pickle") + + def test_init(self, clbk_handler): + assert len(clbk_handler._CallbackHandler__callbacks) == 0 + assert clbk_handler._checkpoint is None + assert clbk_handler.editable is True + + def test_callbacks_set(self, clbk_handler): + clbk_handler._callbacks = ("default", "callback_instance", "callback_path") + assert clbk_handler._CallbackHandler__callbacks == [{"name": "default", "default": "callback_instance", + "path": "callback_path"}] + clbk_handler._callbacks = ("another", "callback_instance2", "callback_path") + assert clbk_handler._CallbackHandler__callbacks == [{"name": "default", "default": "callback_instance", + "path": "callback_path"}, + {"name": "another", "another": "callback_instance2", + "path": "callback_path"}] + + def test_callbacks_get(self, clbk_handler): + clbk_handler._callbacks = ("default", "callback_instance", "callback_path") + clbk_handler._callbacks = ("another", "callback_instance2", "callback_path2") + assert clbk_handler._callbacks == [{"callback": "callback_instance", "path": "callback_path"}, + {"callback": "callback_instance2", "path": "callback_path2"}] + + def test_update_callback(self, clbk_handler_with_dummies): + clbk_handler_with_dummies._update_callback(0, "old_instance") + assert clbk_handler_with_dummies.get_callbacks() == [{"callback": "old_instance", "path": "this_path"}, + {"callback": "callback_other", "path": "otherpath"}] + + def test_add_callback(self, clbk_handler): + clbk_handler.add_callback("callback_new_instance", "this_path") + assert clbk_handler._CallbackHandler__callbacks == [{"name": "callback", "callback": "callback_new_instance", + "path": "this_path"}] + clbk_handler.add_callback("callback_other", "otherpath", "other_clbk") + assert clbk_handler._CallbackHandler__callbacks == [{"name": "callback", "callback": "callback_new_instance", + "path": "this_path"}, + {"name": "other_clbk", "other_clbk": "callback_other", + "path": "otherpath"}] + + def test_get_callbacks_as_dict(self, clbk_handler_with_dummies): + clbk = clbk_handler_with_dummies + assert clbk.get_callbacks() == [{"callback": "callback_new_instance", "path": "this_path"}, + {"callback": "callback_other", "path": "otherpath"}] + assert clbk.get_callbacks() == clbk.get_callbacks(as_dict=True) + + def test_get_callbacks_no_dict(self, clbk_handler_with_dummies): + assert clbk_handler_with_dummies.get_callbacks(as_dict=False) == ["callback_new_instance", "callback_other"] + + def test_get_callback_by_name(self, clbk_handler_with_dummies): + assert clbk_handler_with_dummies.get_callback_by_name("other_clbk") == "callback_other" + assert clbk_handler_with_dummies.get_callback_by_name("callback") is None + + def test__get_callbacks(self, clbk_handler_with_dummies): + clbk = clbk_handler_with_dummies + assert clbk._get_callbacks() == [{"callback": "callback_new_instance", "path": "this_path"}, + {"callback": "callback_other", "path": "otherpath"}] + ckpt = keras.callbacks.ModelCheckpoint("testFilePath") + clbk._checkpoint = ckpt + assert clbk._get_callbacks() == [{"callback": "callback_new_instance", "path": "this_path"}, + {"callback": "callback_other", "path": "otherpath"}, + {"callback": ckpt, "path": "testFilePath"}] + + def test_get_checkpoint(self, clbk_handler): + assert clbk_handler.get_checkpoint() is None + clbk_handler._checkpoint = "testCKPT" + assert clbk_handler.get_checkpoint() == "testCKPT" + + def test_create_model_checkpoint(self, callback_handler): + callback_handler.create_model_checkpoint(filepath="tester_path", verbose=1) + assert callback_handler.editable is False + assert isinstance(callback_handler._checkpoint, ModelCheckpointAdvanced) + assert callback_handler._checkpoint.filepath == "tester_path" + assert callback_handler._checkpoint.verbose == 1 + assert callback_handler._checkpoint.monitor == "val_loss" + + def test_load_callbacks(self, callback_handler, prepare_pickle_files): + assert len(callback_handler.get_callback_by_name("hist").epoch) == 0 + assert len(callback_handler.get_callback_by_name("lr").epoch) == 0 + callback_handler.load_callbacks() + assert len(callback_handler.get_callback_by_name("hist").epoch) == 3 + assert len(callback_handler.get_callback_by_name("lr").epoch) == 3 + + def test_update_checkpoint(self, callback_handler, prepare_pickle_files): + assert len(callback_handler.get_callback_by_name("hist").epoch) == 0 + assert len(callback_handler.get_callback_by_name("lr").epoch) == 0 + callback_handler.create_model_checkpoint(filepath="tester_path", verbose=1) + callback_handler.load_callbacks() + callback_handler.update_checkpoint() + assert len(callback_handler.get_callback_by_name("hist").epoch) == 3 + assert len(callback_handler.get_callback_by_name("lr").epoch) == 3 + assert callback_handler._checkpoint.best == 4 diff --git a/test/test_model_modules/test_model_class.py b/test/test_model_modules/test_model_class.py index 0af16012336c9dbcc3133d7dac1f365e276d11bc..0dbd2d9b67a0748bf09eb4f59e1888aae1ea405d 100644 --- a/test/test_model_modules/test_model_class.py +++ b/test/test_model_modules/test_model_class.py @@ -1,5 +1,5 @@ -import pytest import keras +import pytest from src.model_modules.model_class import AbstractModelClass diff --git a/test/test_modules/test_experiment_setup.py b/test/test_modules/test_experiment_setup.py index 08062f4c81dc172d47daa0c0592f07f4da254fb0..9e6d17627d1697a2150ea7f74a373a720d2f02ac 100644 --- a/test/test_modules/test_experiment_setup.py +++ b/test/test_modules/test_experiment_setup.py @@ -1,11 +1,11 @@ -import pytest -import logging import argparse +import logging import os -from src.run_modules.experiment_setup import ExperimentSetup +import pytest + from src.helpers import TimeTracking, prepare_host -from src.datastore import NameNotFoundInScope, NameNotFoundInDataStore +from src.run_modules.experiment_setup import ExperimentSetup class TestExperimentSetup: @@ -47,7 +47,8 @@ class TestExperimentSetup: data_store = exp_setup.data_store # experiment setup assert data_store.get("data_path", "general") == prepare_host() - assert data_store.get("trainable", "general") is False + assert data_store.get("trainable", "general") is True + assert data_store.get("create_new_model", "general") is True assert data_store.get("fraction_of_training", "general") == 0.8 # set experiment name assert data_store.get("experiment_name", "general") == "TestExperiment" @@ -104,13 +105,14 @@ class TestExperimentSetup: target_var="temp", target_dim="target", window_lead_time=10, dimensions="dim1", interpolate_dim="int_dim", interpolate_method="cubic", limit_nan_fill=5, train_start="2000-01-01", train_end="2000-01-02", val_start="2000-01-03", val_end="2000-01-04", test_start="2000-01-05", - test_end="2000-01-06", use_all_stations_on_all_data_sets=False, trainable=True, - fraction_of_train=0.5, experiment_path=experiment_path) + test_end="2000-01-06", use_all_stations_on_all_data_sets=False, trainable=False, + fraction_of_train=0.5, experiment_path=experiment_path, create_new_model=True) exp_setup = ExperimentSetup(**kwargs) data_store = exp_setup.data_store # experiment setup assert data_store.get("data_path", "general") == prepare_host() assert data_store.get("trainable", "general") is True + assert data_store.get("create_new_model", "general") is True assert data_store.get("fraction_of_training", "general") == 0.5 # set experiment name assert data_store.get("experiment_name", "general") == "TODAY_network" @@ -150,10 +152,30 @@ class TestExperimentSetup: # use all stations on all data sets (train, val, test) assert data_store.get("use_all_stations_on_all_data_sets", "general.test") is False + def test_init_trainable_behaviour(self): + exp_setup = ExperimentSetup(trainable=False, create_new_model=True) + data_store = exp_setup.data_store + assert data_store.get("trainable", "general") is True + assert data_store.get("create_new_model", "general") is True + exp_setup = ExperimentSetup(trainable=False, create_new_model=False) + data_store = exp_setup.data_store + assert data_store.get("trainable", "general") is False + assert data_store.get("create_new_model", "general") is False + exp_setup = ExperimentSetup(trainable=True, create_new_model=True) + data_store = exp_setup.data_store + assert data_store.get("trainable", "general") is True + assert data_store.get("create_new_model", "general") is True + exp_setup = ExperimentSetup(trainable=True, create_new_model=False) + data_store = exp_setup.data_store + assert data_store.get("trainable", "general") is True + assert data_store.get("create_new_model", "general") is False + def test_compare_variables_and_statistics(self): + experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) kwargs = dict(parser_args={"experiment_date": "TODAY"}, var_all_dict={'o3': 'dma8eu', 'temp': 'maximum'}, - stations=['DEBY053', 'DEBW059', 'DEBW027'], variables=["o3", "relhum"], statistics_per_var=None) + stations=['DEBY053', 'DEBW059', 'DEBW027'], variables=["o3", "relhum"], statistics_per_var=None, + experiment_path=experiment_path) with pytest.raises(ValueError) as e: ExperimentSetup(**kwargs) assert "for the variables: {'relhum'}" in e.value.args[0] diff --git a/test/test_modules/test_model_setup.py b/test/test_modules/test_model_setup.py index 65c683003d173031115164b4800d7759ff9cec2f..ade35a244601d138d22af6305e67b5aeae964680 100644 --- a/test/test_modules/test_model_setup.py +++ b/test/test_modules/test_model_setup.py @@ -1,13 +1,12 @@ -import pytest import os -import keras -import mock -from src.run_modules.model_setup import ModelSetup -from src.run_modules.run_environment import RunEnvironment +import pytest + from src.data_handling.data_generator import DataGenerator -from src.model_modules.model_class import AbstractModelClass from src.datastore import EmptyScope +from src.model_modules.model_class import AbstractModelClass +from src.run_modules.model_setup import ModelSetup +from src.run_modules.run_environment import RunEnvironment class TestModelSetup: @@ -21,6 +20,7 @@ class TestModelSetup: obj.callbacks_name = "placeholder_%s_str.pickle" obj.data_store.set("lr_decay", "dummy_str", "general.model") obj.data_store.set("hist", "dummy_str", "general.model") + obj.model_name = "%s.h5" yield obj RunEnvironment().__del__() @@ -56,11 +56,11 @@ class TestModelSetup: def current_scope_as_set(model_cls): return set(model_cls.data_store.search_scope(model_cls.scope, current_scope_only=True)) - def test_set_checkpoint(self, setup): - assert "general.modeltest" not in setup.data_store.search_name("checkpoint") + def test_set_callbacks(self, setup): + assert "general.modeltest" not in setup.data_store.search_name("callbacks") setup.checkpoint_name = "TestName" - setup._set_checkpoint() - assert "general.modeltest" in setup.data_store.search_name("checkpoint") + setup._set_callbacks() + assert "general.modeltest" in setup.data_store.search_name("callbacks") def test_get_model_settings(self, setup_with_model): with pytest.raises(EmptyScope): diff --git a/test/test_modules/test_post_processing.py b/test/test_modules/test_post_processing.py index e6a245ccb27807d59114d0a7c69e472d98bc1e58..67897451eaa5de065b644d1f9868a846cb57d84e 100644 --- a/test/test_modules/test_post_processing.py +++ b/test/test_modules/test_post_processing.py @@ -1,8 +1,3 @@ -import keras - -from src.run_modules.post_processing import PostProcessing - - class TestPostProcessing: def test_init(self): diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py index c6f70169adee6eec74c834d952721f41d3c3fa03..9d1feb03ac71b980f6a4cd1b0e6cac2a52d9625b 100644 --- a/test/test_modules/test_pre_processing.py +++ b/test/test_modules/test_pre_processing.py @@ -1,11 +1,12 @@ import logging + import pytest +from src.data_handling.data_generator import DataGenerator +from src.datastore import NameNotFoundInScope from src.helpers import PyTestRegex from src.run_modules.experiment_setup import ExperimentSetup from src.run_modules.pre_processing import PreProcessing, DEFAULT_ARGS_LIST, DEFAULT_KWARGS_LIST -from src.data_handling.data_generator import DataGenerator -from src.datastore import NameNotFoundInScope from src.run_modules.run_environment import RunEnvironment diff --git a/test/test_modules/test_training.py b/test/test_modules/test_training.py index 08b9eaf19e831ed1662efbe21f4ad29d18dff9b4..31c673f05d055eb7c4ee76318711de030d97d480 100644 --- a/test/test_modules/test_training.py +++ b/test/test_modules/test_training.py @@ -1,21 +1,22 @@ +import glob +import json +import logging +import os +import shutil + import keras +import mock import pytest from keras.callbacks import ModelCheckpoint, History -import mock -import os -import json -import shutil -import logging -import glob -from src.model_modules.inception_model import InceptionModelBase -from src.model_modules.flatten import flatten_tail -from src.run_modules.training import Training -from src.run_modules.run_environment import RunEnvironment from src.data_handling.data_distributor import Distributor from src.data_handling.data_generator import DataGenerator from src.helpers import PyTestRegex -from src.model_modules.keras_extensions import LearningRateDecay, HistoryAdvanced +from src.model_modules.flatten import flatten_tail +from src.model_modules.inception_model import InceptionModelBase +from src.model_modules.keras_extensions import LearningRateDecay, HistoryAdvanced, CallbackHandler +from src.run_modules.run_environment import RunEnvironment +from src.run_modules.training import Training def my_test_model(activation, window_history_size, channels, dropout_rate, add_minor_branch=False): @@ -38,7 +39,7 @@ def my_test_model(activation, window_history_size, channels, dropout_rate, add_m class TestTraining: @pytest.fixture - def init_without_run(self, path: str, model: keras.Model, checkpoint: ModelCheckpoint): + def init_without_run(self, path: str, model: keras.Model, callbacks: CallbackHandler): obj = object.__new__(Training) super(Training, obj).__init__() obj.model = model @@ -47,19 +48,22 @@ class TestTraining: obj.test_set = None obj.batch_size = 256 obj.epochs = 2 - obj.checkpoint = checkpoint - obj.lr_sc = LearningRateDecay() - obj.hist = HistoryAdvanced() + clbk, hist, lr = callbacks + obj.callbacks = clbk + obj.lr_sc = lr + obj.hist = hist obj.experiment_name = "TestExperiment" obj.data_store.set("generator", mock.MagicMock(return_value="mock_train_gen"), "general.train") obj.data_store.set("generator", mock.MagicMock(return_value="mock_val_gen"), "general.val") obj.data_store.set("generator", mock.MagicMock(return_value="mock_test_gen"), "general.test") os.makedirs(path) obj.data_store.set("experiment_path", path, "general") + obj.data_store.set("model_name", os.path.join(path, "test_model.h5"), "general.model") obj.data_store.set("experiment_name", "TestExperiment", "general") path_plot = os.path.join(path, "plots") os.makedirs(path_plot) obj.data_store.set("plot_path", path_plot, "general") + obj._trainable = True yield obj if os.path.exists(path): shutil.rmtree(path) @@ -67,12 +71,9 @@ class TestTraining: @pytest.fixture def learning_rate(self): - return {"lr": [0.01, 0.0094]} - - @pytest.fixture - def init_with_lr(self, init_without_run, learning_rate): - init_without_run.lr_sc.lr = learning_rate - return init_without_run + lr = LearningRateDecay() + lr.lr = {"lr": [0.01, 0.0094]} + return lr @pytest.fixture def history(self): @@ -102,8 +103,15 @@ class TestTraining: return my_test_model(keras.layers.PReLU, 7, 2, 0.1, False) @pytest.fixture - def checkpoint(self, path): - return ModelCheckpoint(os.path.join(path, "model_checkpoint"), monitor='val_loss', save_best_only=True) + def callbacks(self, path): + clbk = CallbackHandler() + hist = HistoryAdvanced() + clbk.add_callback(hist, os.path.join(path, "hist_checkpoint.pickle"), "hist") + lr = LearningRateDecay() + clbk.add_callback(lr, os.path.join(path, "lr_checkpoint.pickle"), "lr") + clbk.create_model_checkpoint(filepath=os.path.join(path, "model_checkpoint"), monitor='val_loss', + save_best_only=True) + return clbk, hist, lr @pytest.fixture def ready_to_train(self, generator: DataGenerator, init_without_run: Training): @@ -122,7 +130,7 @@ class TestTraining: return obj @pytest.fixture - def ready_to_init(self, generator, model, checkpoint, path): + def ready_to_init(self, generator, model, callbacks, path): os.makedirs(path) obj = RunEnvironment() obj.data_store.set("generator", generator, "general.train") @@ -130,13 +138,17 @@ class TestTraining: obj.data_store.set("generator", generator, "general.test") model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.mean_absolute_error) obj.data_store.set("model", model, "general.model") + obj.data_store.set("model_name", os.path.join(path, "test_model.h5"), "general.model") obj.data_store.set("batch_size", 256, "general.model") obj.data_store.set("epochs", 2, "general.model") - obj.data_store.set("checkpoint", checkpoint, "general.model") - obj.data_store.set("lr_decay", LearningRateDecay(), "general.model") - obj.data_store.set("hist", HistoryAdvanced(), "general.model") + clbk, hist, lr = callbacks + obj.data_store.set("callbacks", clbk, "general.model") + obj.data_store.set("lr_decay", lr, "general.model") + obj.data_store.set("hist", hist, "general.model") obj.data_store.set("experiment_name", "TestExperiment", "general") obj.data_store.set("experiment_path", path, "general") + obj.data_store.set("trainable", True, "general") + obj.data_store.set("create_new_model", True, "general") path_plot = os.path.join(path, "plots") os.makedirs(path_plot) obj.data_store.set("plot_path", path_plot, "general") @@ -178,7 +190,7 @@ class TestTraining: def test_save_model(self, init_without_run, path, caplog): caplog.set_level(logging.DEBUG) - model_name = "TestExperiment_my_model.h5" + model_name = "test_model.h5" assert model_name not in os.listdir(path) init_without_run.save_model() assert caplog.record_tuples[0] == ("root", 10, PyTestRegex(f"save best model to {os.path.join(path, model_name)}")) @@ -190,25 +202,25 @@ class TestTraining: assert caplog.record_tuples[0] == ("root", 10, PyTestRegex("load best model: notExisting")) assert caplog.record_tuples[1] == ("root", 20, PyTestRegex("no weights to reload...")) - def test_save_callbacks_history_created(self, init_without_run, history, path): - init_without_run.save_callbacks_as_json(history) + def test_save_callbacks_history_created(self, init_without_run, history, learning_rate, path): + init_without_run.save_callbacks_as_json(history, learning_rate) assert "history.json" in os.listdir(path) - def test_save_callbacks_lr_created(self, init_with_lr, history, path): - init_with_lr.save_callbacks_as_json(history) + def test_save_callbacks_lr_created(self, init_without_run, history, learning_rate, path): + init_without_run.save_callbacks_as_json(history, learning_rate) assert "history_lr.json" in os.listdir(path) - def test_save_callbacks_inspect_history(self, init_without_run, history, path): - init_without_run.save_callbacks_as_json(history) + def test_save_callbacks_inspect_history(self, init_without_run, history, learning_rate, path): + init_without_run.save_callbacks_as_json(history, learning_rate) with open(os.path.join(path, "history.json")) as jfile: hist = json.load(jfile) assert hist == history.history - def test_save_callbacks_inspect_lr(self, init_with_lr, history, path): - init_with_lr.save_callbacks_as_json(history) + def test_save_callbacks_inspect_lr(self, init_without_run, history, learning_rate, path): + init_without_run.save_callbacks_as_json(history, learning_rate) with open(os.path.join(path, "history_lr.json")) as jfile: lr = json.load(jfile) - assert lr == init_with_lr.lr_sc.lr + assert lr == learning_rate.lr def test_create_monitoring_plots(self, init_without_run, learning_rate, history, path): assert len(glob.glob(os.path.join(path, "plots", "TestExperiment_history_*.pdf"))) == 0 diff --git a/test/test_plotting/test_training_monitoring.py b/test/test_plotting/test_training_monitoring.py index 358a19adf5c81c90f7e77b787ca7b50923990f00..7e4e21c1a28b35bef4aa6e613756378fe41611b5 100644 --- a/test/test_plotting/test_training_monitoring.py +++ b/test/test_plotting/test_training_monitoring.py @@ -1,9 +1,10 @@ +import os + import keras import pytest -import os -from src.plotting.training_monitoring import PlotModelLearningRate, PlotModelHistory from src.model_modules.keras_extensions import LearningRateDecay +from src.plotting.training_monitoring import PlotModelLearningRate, PlotModelHistory @pytest.fixture diff --git a/test/test_statistics.py b/test/test_statistics.py index fa7de6022f0957031f90fbf951679594583eccab..308ac655787e69f90b45e65e7e7df8f35875f652 100644 --- a/test/test_statistics.py +++ b/test/test_statistics.py @@ -1,7 +1,8 @@ +import numpy as np +import pandas as pd import pytest import xarray as xr -import pandas as pd -import numpy as np + from src.statistics import standardise, standardise_inverse, centre, centre_inverse