diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index d61146b61a5ade9118675fa7b895212f310acc71..242dcd31d01134b75f70a12c2708d4c99bb94301 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -13,6 +13,7 @@ DEFAULT_START = "1997-01-01" DEFAULT_END = "2017-12-31" DEFAULT_WINDOW_HISTORY_SIZE = 13 DEFAULT_OVERWRITE_LOCAL_DATA = False +DEFAULT_OVERWRITE_LAZY_DATA = False DEFAULT_HPC_LOGIN_LIST = ["ju", "hdfmll"] # ju[wels} #hdfmll(ogin) DEFAULT_HPC_HOST_LIST = ["jw", "hdfmlc"] # first part of node names for Juwels (jw[comp], hdfmlc(ompute). DEFAULT_CREATE_NEW_MODEL = True diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 8205ae6c28f3683b1052c292e5d063d8bca555dc..5aefb0368ec1cf544443bb5e0412dd16a97f2a7f 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -12,6 +12,7 @@ from mlair.helpers import remove_items from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD from mlair.helpers.filter import filter_width_kzf +import copy import inspect from typing import Callable import datetime as dt @@ -140,13 +141,6 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: start, end = self.update_start_end(ind) - # if ind == 0: # for inputs - # estimated_filter_width = self.estimate_filter_width() - # start = self._add_time_delta(self.start, -estimated_filter_width) - # end = self._add_time_delta(self.end, estimated_filter_width) - # else: # target - # start, end = self.start, self.end - vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) @@ -264,7 +258,83 @@ class DataHandlerMixedSamplingWithClimateFirFilter(DataHandlerClimateFirFilter): data_handler = DataHandlerMixedSamplingWithClimateFirFilterSingleStation data_handler_transformation = DataHandlerMixedSamplingWithClimateFirFilterSingleStation - _requirements = data_handler.requirements() + data_handler_unfiltered = DataHandlerMixedSamplingSingleStation + _requirements = list(set(data_handler.requirements() + data_handler_unfiltered.requirements())) + DEFAULT_FILTER_ADD_UNFILTERED = False + + def __init__(self, *args, data_handler_class_unfiltered: data_handler_unfiltered = None, + filter_add_unfiltered: bool = DEFAULT_FILTER_ADD_UNFILTERED, **kwargs): + self.dh_unfiltered = data_handler_class_unfiltered + self.filter_add_unfiltered = filter_add_unfiltered + super().__init__(*args, **kwargs) + + @classmethod + def own_args(cls, *args): + """Return all arguments (including kwonlyargs).""" + super_own_args = DataHandlerClimateFirFilter.own_args(*args) + arg_spec = inspect.getfullargspec(cls) + list_of_args = arg_spec.args + arg_spec.kwonlyargs + super_own_args + return remove_items(list_of_args, ["self"] + list(args)) + + def _create_collection(self): + if self.filter_add_unfiltered is True and self.dh_unfiltered is not None: + return [self.id_class, self.dh_unfiltered] + else: + return super()._create_collection() + + @classmethod + def build(cls, station: str, **kwargs): + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler.requirements() if k in kwargs} + filter_add_unfiltered = kwargs.get("filter_add_unfiltered", False) + sp_keys = cls.build_update_kwargs(sp_keys, dh_type="filtered") + sp = cls.data_handler(station, **sp_keys) + if filter_add_unfiltered is True: + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} + sp_keys = cls.build_update_kwargs(sp_keys, dh_type="unfiltered") + sp_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) + else: + sp_unfiltered = None + dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs} + return cls(sp, data_handler_class_unfiltered=sp_unfiltered, **dp_args) + + @classmethod + def build_update_kwargs(cls, kwargs_dict, dh_type="filtered"): + if "transformation" in kwargs_dict: + trafo_opts = kwargs_dict.get("transformation") + if isinstance(trafo_opts, dict): + kwargs_dict["transformation"] = trafo_opts.get(dh_type) + return kwargs_dict + + @classmethod + def transformation(cls, set_stations, tmp_path=None, **kwargs): + + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} + if "transformation" not in sp_keys.keys(): + return + + transformation_filtered = super().transformation(set_stations, tmp_path=tmp_path, + dh_transformation=cls.data_handler_transformation, **kwargs) + if kwargs.get("filter_add_unfiltered", False) is False: + return transformation_filtered + else: + transformation_unfiltered = super().transformation(set_stations, tmp_path=tmp_path, + dh_transformation=cls.data_handler_unfiltered, **kwargs) + return {"filtered": transformation_filtered, "unfiltered": transformation_unfiltered} + + def get_X_original(self): + if self.use_filter_branches is True: + X = [] + for data in self._collection: + if hasattr(data, "filter_dim"): + X_total = data.get_X() + filter_dim = data.filter_dim + for filter_name in data.filter_dim_order: + X.append(X_total.sel({filter_dim: filter_name}, drop=True)) + else: + X.append(data.get_X()) + return X + else: + return super().get_X_original() class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithKzFilterSingleStation): diff --git a/mlair/data_handler/data_handler_neighbors.py b/mlair/data_handler/data_handler_neighbors.py index 6c87946eaad5568e1ff59c3988bf8fe469442641..26fe6f8549639019169bf0b894186d354eaddc83 100644 --- a/mlair/data_handler/data_handler_neighbors.py +++ b/mlair/data_handler/data_handler_neighbors.py @@ -1,6 +1,10 @@ __author__ = 'Lukas Leufen' __date__ = '2020-07-17' +""" +WARNING: This data handler is just a prototype and has not been validated to work properly! Use it with caution! +""" + import datetime as dt import numpy as np @@ -19,7 +23,7 @@ number = Union[float, int] num_or_list = Union[number, List[number]] -class DataHandlerNeighbors(DefaultDataHandler): +class DataHandlerNeighbors(DefaultDataHandler): # pragma: no cover """Data handler including neighboring stations.""" def __init__(self, id_class, data_path, neighbors=None, min_length=0, @@ -48,7 +52,7 @@ class DataHandlerNeighbors(DefaultDataHandler): return [super(DataHandlerNeighbors, self).get_coordinates()].append(neighbors) -def run_data_prep(): +def run_data_prep(): # pragma: no cover """Comment: methods just to start write meaningful test routines.""" data = DummyDataHandler("main_class") data.get_X() @@ -63,7 +67,7 @@ def run_data_prep(): data_prep.get_data(upsampling=False) -def create_data_prep(): +def create_data_prep(): # pragma: no cover """Comment: methods just to start write meaningful test routines.""" path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata") station_type = None @@ -91,7 +95,7 @@ def create_data_prep(): return data_prep -class DummyDataHandler(AbstractDataHandler): +class DummyDataHandler(AbstractDataHandler): # pragma: no cover def __init__(self, name, number_of_samples=None): """This data handler takes a name argument and the number of samples to generate. If not provided, a random diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 054713481478826af2c5220f2b9d9e9c08c4a0c2..8e95e76365181ee76f91a91319b912f2626a223a 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -223,7 +223,8 @@ class DataHandlerSingleStation(AbstractDataHandler): elif method == "centre": return statistics.centre_apply(data, mean), {"mean": mean, "method": method} elif method == "min_max": - return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method, + kws = {"feature_range": feature_range} if feature_range is not None else {} + return statistics.min_max_apply(data, min, max, **kws), {"min": min, "max": max, "method": method, "feature_range": feature_range} elif method == "log": return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method} @@ -416,8 +417,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - # used_chem_vars = list(set(chem_vars) & set(self.statistics_per_var.keys())) - used_chem_vars = list(set(chem_vars) & set(data.variables.values)) + used_chem_vars = list(set(chem_vars) & set(data.coords[self.target_dim].values)) if len(used_chem_vars) > 0: data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) return data @@ -463,11 +463,8 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: this array """ ind = pd.DataFrame({'val': index_value}, index=index_value) - # res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim=squeez/e_dim, drop=True) res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze( - dim=squeeze_dim, - drop=True - ) + dim=squeeze_dim, drop=True) res.name = index_name return res @@ -750,8 +747,6 @@ class DataHandlerSingleStation(AbstractDataHandler): if __name__ == "__main__": - # dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - # print(dp) statistics_per_var = {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'} sp = DataHandlerSingleStation(data_path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122', statistics_per_var=statistics_per_var, station_type='background', diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 785eb7dffff28a676342feace519a6db0871c1df..4707fd580562a68fd6b2dc0843551905e70d7e50 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -4,6 +4,7 @@ __author__ = 'Lukas Leufen' __date__ = '2020-08-26' import inspect +import copy import numpy as np import pandas as pd import xarray as xr @@ -12,7 +13,7 @@ from functools import partial import logging from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.data_handler import DefaultDataHandler -from mlair.helpers import remove_items, to_list, TimeTrackingWrapper +from mlair.helpers import remove_items, to_list, TimeTrackingWrapper, statistics from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf @@ -126,32 +127,16 @@ class DataHandlerFilter(DefaultDataHandler): list_of_args = arg_spec.args + arg_spec.kwonlyargs + super_own_args return remove_items(list_of_args, ["self"] + list(args)) - def get_X_original(self): - if self.use_filter_branches is True: - X = [] - for data in self._collection: - X_total = data.get_X() - filter_dim = data.filter_dim - for filter_name in data.filter_dim_order: - X.append(X_total.sel({filter_dim: filter_name}, drop=True)) - return X - else: - return super().get_X_original() - class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): """Data handler for a single station to be used by a superior data handler. Inputs are FIR filtered.""" _requirements = remove_items(DataHandlerFilterSingleStation.requirements(), "station") - _hash = DataHandlerFilterSingleStation._hash + ["filter_cutoff_period", "filter_order", "filter_window_type", - "_add_unfiltered"] + _hash = DataHandlerFilterSingleStation._hash + ["filter_cutoff_period", "filter_order", "filter_window_type"] DEFAULT_WINDOW_TYPE = ("kaiser", 5) - DEFAULT_ADD_UNFILTERED = False - def __init__(self, *args, filter_cutoff_period, filter_order, filter_window_type=DEFAULT_WINDOW_TYPE, - filter_add_unfiltered=DEFAULT_ADD_UNFILTERED, **kwargs): - # self._check_sampling(**kwargs) + def __init__(self, *args, filter_cutoff_period, filter_order, filter_window_type=DEFAULT_WINDOW_TYPE, **kwargs): # self.original_data = None # ToDo: implement here something to store unfiltered data self.fs = self._get_fs(**kwargs) if filter_window_type == "kzf": @@ -161,7 +146,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): assert len(self.filter_cutoff_period) == (len(filter_order) - len(removed_index)) self.filter_order = self._prepare_filter_order(filter_order, removed_index, self.fs) self.filter_window_type = filter_window_type - self._add_unfiltered = filter_add_unfiltered + self.unfiltered_name = "unfiltered" super().__init__(*args, **kwargs) @staticmethod @@ -223,8 +208,6 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): self.filter_window_type, self.target_dim) self.fir_coeff = fir.filter_coefficients() fir_data = fir.filtered_data() - if self._add_unfiltered is True: - fir_data.append(self.input_data) self.input_data = xr.concat(fir_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) # this is just a code snippet to check the results of the kz filter # import matplotlib @@ -249,8 +232,6 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): else: index.append(f"band{band_num}") band_num += 1 - if self._add_unfiltered: - index.append("unfiltered") self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) @@ -261,13 +242,89 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): _data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data super(__class__, self)._extract_lazy((_data, _meta, _input_data, _target_data)) + def transform(self, data_in, dim: Union[str, int] = 0, inverse: bool = False, opts=None, + transformation_dim=None): + """ + Transform data according to given transformation settings. + + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 + and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale + (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This + method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the + internal transform method, internal mean and internal standard deviation weren't set ('inverse=True'). + + :param string/int dim: This param is not used for inverse transformation. + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :param inverse: Switch between transformation and inverse transformation. + + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + + if transformation_dim is None: + transformation_dim = self.DEFAULT_TARGET_DIM + + def f(data, method="standardise", feature_range=None): + if method == "standardise": + return statistics.standardise(data, dim) + elif method == "centre": + return statistics.centre(data, dim) + elif method == "min_max": + kwargs = {"feature_range": feature_range} if feature_range is not None else {} + return statistics.min_max(data, dim, **kwargs) + elif method == "log": + return statistics.log(data, dim) + else: + raise NotImplementedError + + def f_apply(data, method, **kwargs): + for k, v in kwargs.items(): + if not (isinstance(v, xr.DataArray) or v is None): + _, opts = statistics.min_max(data, dim) + helper = xr.ones_like(opts['min']) + kwargs[k] = helper * v + mean = kwargs.pop('mean', None) + std = kwargs.pop('std', None) + min = kwargs.pop('min', None) + max = kwargs.pop('max', None) + feature_range = kwargs.pop('feature_range', None) + + if method == "standardise": + return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + elif method == "centre": + return statistics.centre_apply(data, mean), {"mean": mean, "method": method} + elif method == "min_max": + return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method, + "feature_range": feature_range} + elif method == "log": + return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + else: + raise NotImplementedError + + opts = opts or {} + opts_updated = {} + if not inverse: + transformed_values = [] + for var in data_in.variables.values: + data_var = data_in.sel(**{transformation_dim: [var]}) + var_opts = opts.get(var, {}) + _apply = (var_opts.get("mean", None) is not None) or (var_opts.get("min") is not None) + values, new_var_opts = locals()["f_apply" if _apply else "f"](data_var, **var_opts) + opts_updated[var] = copy.deepcopy(new_var_opts) + transformed_values.append(values) + return xr.concat(transformed_values, dim=transformation_dim), opts_updated + else: + return self.inverse_transform(data_in, opts, transformation_dim) + class DataHandlerFirFilter(DataHandlerFilter): """Data handler using FIR filtered data.""" data_handler = DataHandlerFirFilterSingleStation data_handler_transformation = DataHandlerFirFilterSingleStation - _requirements = data_handler.requirements() class DataHandlerKzFilterSingleStation(DataHandlerFilterSingleStation): @@ -396,12 +453,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(add_unfiltered_index=False), name=self.filter_dim)) - # add unfiltered raw data - if self._add_unfiltered is True: - data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size) - data_raw = data_raw.expand_dims({self.filter_dim: ["unfiltered"]}, -1) - input_data = xr.concat([input_data, data_raw], self.filter_dim) - self.input_data = input_data # this is just a code snippet to check the results of the filter @@ -422,8 +473,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1) index = list(map(f, index.tolist())) index = list(map(lambda x: str(x) + "d", index)) + ["res"] - if self._add_unfiltered and add_unfiltered_index: - index.append("unfiltered") self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) @@ -492,11 +541,3 @@ class DataHandlerClimateFirFilter(DataHandlerFilter): _requirements = data_handler.requirements() _store_attributes = data_handler.store_attributes() - # def get_X_original(self): - # X = [] - # for data in self._collection: - # X_total = data.get_X() - # filter_dim = data.filter_dim - # for filter in data.filter_dim_order: - # X.append(X_total.sel({filter_dim: filter}, drop=True)) - # return X diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 8ad3e1e7ff583bd511d6311f2ab9de886f440fc9..6fa7952d4bc0a278f17f767073969822924b6d5f 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -53,6 +53,7 @@ class DefaultDataHandler(AbstractDataHandler): self._Y = None self._X_extreme = None self._Y_extreme = None + self._data_intersection = None self._use_multiprocessing = use_multiprocessing self._max_number_multiprocessing = max_number_multiprocessing _name_affix = str(f"{str(self.id_class)}_{name_affix}" if name_affix is not None else id(self)) @@ -133,7 +134,7 @@ class DefaultDataHandler(AbstractDataHandler): return X, Y def __repr__(self): - return ";".join(list(map(lambda x: str(x), self._collection))) + return str(self._collection[0]) def get_X_original(self): X = [] @@ -172,10 +173,15 @@ class DefaultDataHandler(AbstractDataHandler): else: X = list(map(lambda x: x.sel({dim: intersect}), X_original)) Y = Y_original.sel({dim: intersect}) + self._data_intersection = intersect self._X, self._Y = X, Y def get_observation(self): - return self.id_class.observation.copy().squeeze() + dim = self.time_dim + if self._data_intersection is not None: + return self.id_class.observation.sel({dim: self._data_intersection}).copy().squeeze() + else: + return self.id_class.observation.copy().squeeze() def apply_transformation(self, data, base="target", dim=0, inverse=False): return self.id_class.apply_transformation(data, dim=dim, base=base, inverse=inverse) @@ -248,7 +254,7 @@ class DefaultDataHandler(AbstractDataHandler): d.coords[dim] = d.coords[dim].values + np.timedelta64(*timedelta) @classmethod - def transformation(cls, set_stations, tmp_path=None, **kwargs): + def transformation(cls, set_stations, tmp_path=None, dh_transformation=None, **kwargs): """ ### supported transformation methods @@ -278,31 +284,14 @@ class DefaultDataHandler(AbstractDataHandler): If min and max are not None, the default data handler expects this parameters to match the data and applies this values to the data. Make sure that all dimensions and/or coordinates are in agreement. """ + if dh_transformation is None: + dh_transformation = cls.data_handler_transformation - sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in dh_transformation.requirements() if k in kwargs} if "transformation" not in sp_keys.keys(): return transformation_dict = ({}, {}) - def _inner(): - """Inner method that is performed in both serial and parallel approach.""" - if dh is not None: - for i, transformation in enumerate(dh._transformation): - for var in transformation.keys(): - if var not in transformation_dict[i].keys(): - transformation_dict[i][var] = {} - opts = transformation[var] - if not transformation_dict[i][var].get("method", opts["method"]) == opts["method"]: - # data handlers with filters are allowed to change transformation method to standardise - assert hasattr(dh, "filter_dim") and opts["method"] == "standardise" - transformation_dict[i][var]["method"] = opts["method"] - for k in ["mean", "std", "min", "max"]: - old = transformation_dict[i][var].get(k, None) - new = opts.get(k) - transformation_dict[i][var][k] = new if old is None else old.combine_first(new) - if "feature_range" in opts.keys(): - transformation_dict[i][var]["feature_range"] = opts.get("feature_range", None) - max_process = kwargs.get("max_number_multiprocessing", 16) n_process = min([psutil.cpu_count(logical=False), len(set_stations), max_process]) # use only physical cpus if n_process > 1 and kwargs.get("use_multiprocessing", True) is True: # parallel solution @@ -311,24 +300,29 @@ class DefaultDataHandler(AbstractDataHandler): logging.info(f"running {getattr(pool, '_processes')} processes in parallel") sp_keys.update({"tmp_path": tmp_path, "return_strategy": "reference"}) output = [ - pool.apply_async(f_proc, args=(cls.data_handler_transformation, station), kwds=sp_keys) + pool.apply_async(f_proc, args=(dh_transformation, station), kwds=sp_keys) for station in set_stations] for p in output: _res_file, s = p.get() with open(_res_file, "rb") as f: dh = dill.load(f) os.remove(_res_file) - _inner() + transformation_dict = cls.update_transformation_dict(dh, transformation_dict) pool.close() else: # serial solution logging.info("use serial transformation approach") sp_keys.update({"return_strategy": "result"}) for station in set_stations: - dh, s = f_proc(cls.data_handler_transformation, station, **sp_keys) - _inner() + dh, s = f_proc(dh_transformation, station, **sp_keys) + transformation_dict = cls.update_transformation_dict(dh, transformation_dict) # aggregate all information iter_dim = sp_keys.get("iter_dim", cls.DEFAULT_ITER_DIM) + transformation_dict = cls.aggregate_transformation(transformation_dict, iter_dim) + return transformation_dict + + @classmethod + def aggregate_transformation(cls, transformation_dict, iter_dim): pop_list = [] for i, transformation in enumerate(transformation_dict): for k in transformation.keys(): @@ -349,6 +343,27 @@ class DefaultDataHandler(AbstractDataHandler): transformation_dict[i].pop(k) return transformation_dict + @classmethod + def update_transformation_dict(cls, dh, transformation_dict): + """Inner method that is performed in both serial and parallel approach.""" + if dh is not None: + for i, transformation in enumerate(dh._transformation): + for var in transformation.keys(): + if var not in transformation_dict[i].keys(): + transformation_dict[i][var] = {} + opts = transformation[var] + if not transformation_dict[i][var].get("method", opts["method"]) == opts["method"]: + # data handlers with filters are allowed to change transformation method to standardise + assert hasattr(dh, "filter_dim") and opts["method"] == "standardise" + transformation_dict[i][var]["method"] = opts["method"] + for k in ["mean", "std", "min", "max"]: + old = transformation_dict[i][var].get(k, None) + new = opts.get(k) + transformation_dict[i][var][k] = new if old is None else old.combine_first(new) + if "feature_range" in opts.keys(): + transformation_dict[i][var]["feature_range"] = opts.get("feature_range", None) + return transformation_dict + def get_coordinates(self): return self.id_class.get_coordinates() diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index a63cef975888162f335e4528c2f99bdfc7a892d5..543cff3624577ac617733b8b593c5f52f25196b3 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -173,8 +173,10 @@ class ClimateFIRFilter: # visualize if self.plot_path is not None: - self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, plot_name) - # self._plot(sampling, new_dim=new_dim) + try: + self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, plot_name) + except Exception as e: + logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") @staticmethod def _minimum_length(order, minimum_length, pos, window): @@ -382,7 +384,7 @@ class ClimateFIRFilter: _end = pd.to_datetime(data.coords[time_dim].max().values).year filt_coll = [] for _year in range(_start, _end + 1): - logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") + logging.debug(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) d = data.sel({var_dim: [var], time_dim: time_slice}) @@ -504,137 +506,6 @@ class ClimateFIRFilter: res.name = index_name return res - def _plot(self, sampling, new_dim="window"): - h = None - td_type = {"1d": "D", "1H": "h"}.get(sampling) - if self.plot_path is None: - return - plot_folder = os.path.join(os.path.abspath(self.plot_path), "climFIR") - if not os.path.exists(plot_folder): - os.makedirs(plot_folder) - - # set plot parameter - rc_params = {'axes.labelsize': 'large', - 'xtick.labelsize': 'large', - 'ytick.labelsize': 'large', - 'legend.fontsize': 'medium', - 'axes.titlesize': 'large', - } - plt.rcParams.update(rc_params) - - plot_dict = {} - for i, o in enumerate(range(len(self.plot_data))): - plot_data = self.plot_data[i] - for p_d in plot_data: - var = p_d.get("var") - t0 = p_d.get("t0") - filter_input = p_d.get("filter_input") - filter_input_nc = p_d.get("filter_input_nc") - valid_range = p_d.get("valid_range") - time_range = p_d.get("time_range") - new_dim = p_d.get("new_dim") - h = p_d.get("h") - plot_dict_var = plot_dict.get(var, {}) - plot_dict_t0 = plot_dict_var.get(t0, {}) - plot_dict_order = {"filter_input": filter_input, - "filter_input_nc": filter_input_nc, - "valid_range": valid_range, - "time_range": time_range, - "order": o, "h": h} - plot_dict_t0[i] = plot_dict_order - plot_dict_var[t0] = plot_dict_t0 - plot_dict[var] = plot_dict_var - - for var, viz_date_dict in plot_dict.items(): - for it0, t0 in enumerate(viz_date_dict.keys()): - viz_data = viz_date_dict[t0] - residuum_true = None - for ifilter in sorted(viz_data.keys()): - data = viz_data[ifilter] - filter_input = data["filter_input"] - filter_input_nc = data["filter_input_nc"] if residuum_true is None else residuum_true.sel( - {new_dim: filter_input.coords[new_dim]}) - valid_range = data["valid_range"] - time_axis = data["time_range"] - # time_axis = pd.date_range(t_minus, t_plus, freq=sampling) - filter_order = data["order"] - h = data["h"] - t_minus = t0 + np.timedelta64(-int(1.5 * valid_range.start), td_type) - t_plus = t0 + np.timedelta64(int(0.5 * valid_range.start), td_type) - fig, ax = plt.subplots() - ax.axvspan(t0 + np.timedelta64(-valid_range.start, td_type), - t0 + np.timedelta64(valid_range.stop - 1, td_type), color="whitesmoke", - label="valid area") - ax.axvline(t0, color="lightgrey", lw=6, label="time of interest ($t_0$)") - - # original data - ax.plot(time_axis, filter_input_nc.values.flatten(), color="darkgrey", linestyle="dashed", - label="original") - - # clim apriori - if ifilter == 0: - d_tmp = filter_input.sel( - {new_dim: slice(0, filter_input.coords[new_dim].values.max())}).values.flatten() - else: - d_tmp = filter_input.values.flatten() - ax.plot(time_axis[len(time_axis) - len(d_tmp):], d_tmp, color="darkgrey", linestyle="solid", - label="estimated future") - - # clim filter response - filt = xr.apply_ufunc(fir_filter_convolve, filter_input, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}, - output_dtypes=[filter_input.dtype]) - ax.plot(time_axis, filt.values.flatten(), color="black", linestyle="solid", - label="clim filter response", linewidth=2) - residuum_estimated = filter_input - filt - - # ideal filter response - filt = xr.apply_ufunc(fir_filter_convolve, filter_input_nc, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}, - output_dtypes=[filter_input.dtype]) - ax.plot(time_axis, filt.values.flatten(), color="black", linestyle="dashed", - label="ideal filter response", linewidth=2) - residuum_true = filter_input_nc - filt - - # set title, legend, and save plot - ax_start = max(t_minus, time_axis[0]) - ax_end = min(t_plus, time_axis[-1]) - ax.set_xlim((ax_start, ax_end)) - plt.title(f"Input of ClimFilter ({str(var)})") - plt.legend() - fig.autofmt_xdate() - plt.tight_layout() - plot_name = os.path.join(plot_folder, - f"climFIR_{self.plot_name}_{str(var)}_{it0}_{ifilter}.pdf") - plt.savefig(plot_name, dpi=300) - plt.close('all') - - # plot residuum - fig, ax = plt.subplots() - ax.axvspan(t0 + np.timedelta64(-valid_range.start, td_type), - t0 + np.timedelta64(valid_range.stop - 1, td_type), color="whitesmoke", - label="valid area") - ax.axvline(t0, color="lightgrey", lw=6, label="time of interest ($t_0$)") - ax.plot(time_axis, residuum_true.values.flatten(), color="black", linestyle="dashed", - label="ideal filter residuum", linewidth=2) - ax.plot(time_axis, residuum_estimated.values.flatten(), color="black", linestyle="solid", - label="clim filter residuum", linewidth=2) - ax.set_xlim((ax_start, ax_end)) - plt.title(f"Residuum of ClimFilter ({str(var)})") - plt.legend(loc="upper left") - fig.autofmt_xdate() - plt.tight_layout() - plot_name = os.path.join(plot_folder, - f"climFIR_{self.plot_name}_{str(var)}_{it0}_{ifilter}_residuum.pdf") - plt.savefig(plot_name, dpi=300) - plt.close('all') - @property def filter_coefficients(self): return self._h diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index fef52fb27d602b5931587ff0fa2d8edd7e0c2d8f..87f780f9a6133edfcb2f9c71c2956b92f332e915 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -152,7 +152,10 @@ def min_max_apply(data: Data, _min: Data, _max: Data, feature_range: Data = (0, :param feature_range: scale data to any interval given in feature range. Default is scaling on interval [0, 1]. :return: min/max scaled data """ - return (data - _min) / (_max - _min) * (max(feature_range) - min(feature_range)) + min(feature_range) + if not isinstance(feature_range, xr.DataArray): + return (data - _min) / (_max - _min) * (max(feature_range) - min(feature_range)) + min(feature_range) + else: + return (data - _min) / (_max - _min) * (feature_range.max() - feature_range.min()) + feature_range.min() def log(data: Data, dim: Union[str, int]) -> Tuple[Data, Dict[(str, Data)]]: diff --git a/mlair/plotting/data_insight_plotting.py b/mlair/plotting/data_insight_plotting.py index 3bec759076a862d89be6c7495ef9abdebd0d4123..6180493741c030d5dfdfcfa8972035619632c8aa 100644 --- a/mlair/plotting/data_insight_plotting.py +++ b/mlair/plotting/data_insight_plotting.py @@ -8,6 +8,7 @@ import os import logging import multiprocessing import psutil +import sys import numpy as np import pandas as pd @@ -635,16 +636,17 @@ class PlotPeriodogram(AbstractPlotClass): # pragma: no cover @staticmethod def _has_filter_dimension(g, pos): - # check if coords raw data differs from input / target data - check_data = g.id_class - if "filter" not in [check_data.input_data, check_data.target_data][pos].coords.dims: + """Inspect if filtered data is provided and return number and labels of filtered components.""" + check_class = g.id_class + check_data = [check_class.get_X(as_numpy=False), check_class.get_Y(as_numpy=False)][pos] + if not hasattr(check_class, "filter_dim"): # data handler has no filtered data return 1, [] else: - if len(set(check_data._data[0].coords.dims).symmetric_difference(check_data.input_data.coords.dims)) > 0: - return g.id_class.input_data.coords["filter"].shape[0], g.id_class.input_data.coords[ - "filter"].values.tolist() - else: + filter_dim = check_class.filter_dim + if filter_dim not in check_data.coords.dims: # current data has no filter (e.g. target data) return 1, [] + else: + return check_data.coords[filter_dim].shape[0], check_data.coords[filter_dim].values.tolist() @TimeTrackingWrapper def _prepare_pgram(self, generator, pos, multiple=1, use_multiprocessing=False): @@ -662,7 +664,6 @@ class PlotPeriodogram(AbstractPlotClass): # pragma: no cover plot_data_mean_single = dict() self.f_index = np.logspace(-3, 0 if self._sampling == "daily" else np.log10(24), 1000) raw_data_single = self._prepare_pgram_parallel_gen(generator, m, pos, use_multiprocessing) - # raw_data_single = self._prepare_pgram_parallel_var(generator, m, pos, use_multiprocessing) for var in raw_data_single.keys(): pgram_com = [] pgram_mean = 0 @@ -857,9 +858,10 @@ def f_proc(var, d_var, f_index, time_dim="datetime"): # pragma: no cover return var_str, f_index, pgram - def f_proc_2(g, m, pos, variables_dim, time_dim, f_index): # pragma: no cover raw_data_single = dict() + if hasattr(g.id_class, "lazy"): + g.id_class.load_lazy() if g.id_class.lazy is True else None if m == 0: d = g.id_class._data else: @@ -871,6 +873,8 @@ def f_proc_2(g, m, pos, variables_dim, time_dim, f_index): # pragma: no cover d_var = d.loc[{variables_dim: var}].squeeze().dropna(time_dim) var_str, f, pgram = f_proc(var, d_var, f_index) raw_data_single[var_str] = [(f, pgram)] + if hasattr(g.id_class, "lazy"): + g.id_class.clean_up() if g.id_class.lazy is True else None return raw_data_single @@ -966,59 +970,63 @@ class PlotClimateFirFilter(AbstractPlotClass): for it0, t0 in enumerate(viz_date_dict.keys()): viz_data = viz_date_dict[t0] residuum_true = None - for ifilter in sorted(viz_data.keys()): - data = viz_data[ifilter] - filter_input = data["filter_input"] - filter_input_nc = data["filter_input_nc"] if residuum_true is None else residuum_true.sel( - {new_dim: filter_input.coords[new_dim]}) - valid_range = data["valid_range"] - time_axis = data["time_range"] - filter_order = data["order"] - h = data["h"] - fig, ax = plt.subplots() - - # plot backgrounds - self._plot_valid_area(ax, t0, valid_range, td_type) - self._plot_t0(ax, t0) - - # original data - self._plot_original_data(ax, time_axis, filter_input_nc) - - # clim apriori - self._plot_apriori(ax, time_axis, filter_input, new_dim, ifilter) - - # clim filter response - residuum_estimated = self._plot_clim_filter(ax, time_axis, filter_input, new_dim, h, + try: + for ifilter in sorted(viz_data.keys()): + data = viz_data[ifilter] + filter_input = data["filter_input"] + filter_input_nc = data["filter_input_nc"] if residuum_true is None else residuum_true.sel( + {new_dim: filter_input.coords[new_dim]}) + valid_range = data["valid_range"] + time_axis = data["time_range"] + filter_order = data["order"] + h = data["h"] + fig, ax = plt.subplots() + + # plot backgrounds + self._plot_valid_area(ax, t0, valid_range, td_type) + self._plot_t0(ax, t0) + + # original data + self._plot_original_data(ax, time_axis, filter_input_nc) + + # clim apriori + self._plot_apriori(ax, time_axis, filter_input, new_dim, ifilter) + + # clim filter response + residuum_estimated = self._plot_clim_filter(ax, time_axis, filter_input, new_dim, h, + output_dtypes=filter_input.dtype) + + # ideal filter response + residuum_true = self._plot_ideal_filter(ax, time_axis, filter_input_nc, new_dim, h, output_dtypes=filter_input.dtype) - # ideal filter response - residuum_true = self._plot_ideal_filter(ax, time_axis, filter_input_nc, new_dim, h, - output_dtypes=filter_input.dtype) - - # set title, legend, and save plot - xlims = self._set_xlim(ax, t0, filter_order, valid_range, td_type, time_axis) - - plt.title(f"Input of ClimFilter ({str(var)})") - plt.legend() - fig.autofmt_xdate() - plt.tight_layout() - self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}" - self._save() - - # plot residuum - fig, ax = plt.subplots() - self._plot_valid_area(ax, t0, valid_range, td_type) - self._plot_t0(ax, t0) - self._plot_series(ax, time_axis, residuum_true.values.flatten(), style="ideal") - self._plot_series(ax, time_axis, residuum_estimated.values.flatten(), style="clim") - ax.set_xlim(xlims) - plt.title(f"Residuum of ClimFilter ({str(var)})") - plt.legend(loc="upper left") - fig.autofmt_xdate() - plt.tight_layout() - - self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}_residuum" - self._save() + # set title, legend, and save plot + xlims = self._set_xlim(ax, t0, filter_order, valid_range, td_type, time_axis) + + plt.title(f"Input of ClimFilter ({str(var)})") + plt.legend() + fig.autofmt_xdate() + plt.tight_layout() + self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}" + self._save() + + # plot residuum + fig, ax = plt.subplots() + self._plot_valid_area(ax, t0, valid_range, td_type) + self._plot_t0(ax, t0) + self._plot_series(ax, time_axis, residuum_true.values.flatten(), style="ideal") + self._plot_series(ax, time_axis, residuum_estimated.values.flatten(), style="clim") + ax.set_xlim(xlims) + plt.title(f"Residuum of ClimFilter ({str(var)})") + plt.legend(loc="upper left") + fig.autofmt_xdate() + plt.tight_layout() + + self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}_residuum" + self._save() + except Exception as e: + logging.info(f"Could not create plot because of:\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + pass def _set_xlim(self, ax, t0, order, valid_range, td_type, time_axis): """ diff --git a/mlair/plotting/postprocessing_plotting.py b/mlair/plotting/postprocessing_plotting.py index ecba8a4e0a3369fbb170a7427ef81365d531bc3b..7ba84656cc00a89a7ec88efcc30bf665e0849e2f 100644 --- a/mlair/plotting/postprocessing_plotting.py +++ b/mlair/plotting/postprocessing_plotting.py @@ -847,13 +847,14 @@ class PlotTimeSeries: """ def __init__(self, stations: List, data_path: str, name: str, window_lead_time: int = None, plot_folder: str = ".", - sampling="daily", model_name="nn", obs_name="obs"): + sampling="daily", model_name="nn", obs_name="obs", ahead_dim="ahead"): """Initialise.""" self._data_path = data_path self._data_name = name self._stations = stations self._model_name = model_name self._obs_name = obs_name + self._ahead_dim = ahead_dim self._window_lead_time = self._get_window_lead_time(window_lead_time) self._sampling = self._get_sampling(sampling) self._plot(plot_folder) @@ -876,7 +877,7 @@ class PlotTimeSeries: :param window_lead_time: lead time from arguments to validate :return: validated lead time, comes either from given argument or from data itself """ - ahead_steps = len(self._load_data(self._stations[0]).ahead) + ahead_steps = len(self._load_data(self._stations[0]).coords[self._ahead_dim]) if window_lead_time is None: window_lead_time = ahead_steps return min(ahead_steps, window_lead_time) @@ -898,10 +899,13 @@ class PlotTimeSeries: data_year = data.sel(index=f"{start + i_year}") for i_half_of_year in range(factor): pos = factor * i_year + i_half_of_year - plot_data = self._create_plot_data(data_year, factor, i_half_of_year) - self._plot_obs(axes[pos], plot_data) - self._plot_ahead(axes[pos], plot_data) - if np.isnan(plot_data.values).all(): + try: + plot_data = self._create_plot_data(data_year, factor, i_half_of_year) + self._plot_obs(axes[pos], plot_data) + self._plot_ahead(axes[pos], plot_data) + if np.isnan(plot_data.values).all(): + nan_list.append(pos) + except Exception: nan_list.append(pos) self._clean_up_axes(nan_list, axes, fig) self._save_page(station, pdf_pages) @@ -938,9 +942,8 @@ class PlotTimeSeries: def _plot_ahead(self, ax, data): color = sns.color_palette("Blues_d", self._window_lead_time).as_hex() - for ahead in data.coords["ahead"].values: - plot_data = data.sel(type=self._model_name, ahead=ahead).drop(["type", "ahead"]).squeeze().shift( - index=ahead) + for ahead in data.coords[self._ahead_dim].values: + plot_data = data.sel({"type": self._model_name, self._ahead_dim: ahead}).drop(["type", self._ahead_dim]).squeeze().shift(index=ahead) label = f"{ahead}{self._sampling}" ax.plot(plot_data, color=color[ahead - 1], label=label) diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index b11a2a33417cc27fa23122e31faa089a2dea321c..28277d5057698c01594431008b81d959d415c3e2 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -21,7 +21,7 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \ DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN, DEFAULT_ITER_DIM, \ DEFAULT_USE_MULTIPROCESSING, DEFAULT_USE_MULTIPROCESSING_ON_DEBUG, DEFAULT_MAX_NUMBER_MULTIPROCESSING, \ - DEFAULT_BOOTSTRAP_TYPE, DEFAULT_BOOTSTRAP_METHOD + DEFAULT_BOOTSTRAP_TYPE, DEFAULT_BOOTSTRAP_METHOD, DEFAULT_OVERWRITE_LAZY_DATA from mlair.data_handler import DefaultDataHandler from mlair.run_modules.run_environment import RunEnvironment from mlair.model_modules.fully_connected_networks import FCN_64_32_16 as VanillaModel @@ -218,7 +218,8 @@ class ExperimentSetup(RunEnvironment): hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, data_origin: Dict = None, competitors: list = None, competitor_path: str = None, use_multiprocessing: bool = None, use_multiprocessing_on_debug: bool = None, - max_number_multiprocessing: int = None, start_script: Union[Callable, str] = None, **kwargs): + max_number_multiprocessing: int = None, start_script: Union[Callable, str] = None, + overwrite_lazy_data: bool = None, **kwargs): # create run framework super().__init__() @@ -301,6 +302,8 @@ class ExperimentSetup(RunEnvironment): self._set_param("window_history_size", window_history_size, default=DEFAULT_WINDOW_HISTORY_SIZE) self._set_param("overwrite_local_data", overwrite_local_data, default=DEFAULT_OVERWRITE_LOCAL_DATA, scope="preprocessing") + self._set_param("overwrite_lazy_data", overwrite_lazy_data, default=DEFAULT_OVERWRITE_LAZY_DATA, + scope="preprocessing") self._set_param("transformation", transformation, default={}) self._set_param("transformation", None, scope="preprocessing") self._set_param("data_handler", data_handler, default=DefaultDataHandler) diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index edab0a3e8ad3ee6f7eeb50318f617d3b661255bd..4387f359d05b7c2d06c98bb1ad346d058afc3056 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -6,6 +6,8 @@ __date__ = '2019-12-11' import inspect import logging import os +import sys +import traceback from typing import Dict, Tuple, Union, List, Callable import keras @@ -336,7 +338,8 @@ class PostProcessing(RunEnvironment): PlotSeparationOfScales(self.test_data, plot_folder=self.plot_path, time_dim=time_dim, window_dim=window_dim, target_dim=target_dim, **{"filter_dim": filter_dim}) except Exception as e: - logging.error(f"Could not create plot PlotSeparationOfScales due to the following error: {e}") + logging.error(f"Could not create plot PlotSeparationOfScales due to the following error:" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if (self.bootstrap_skill_scores is not None) and ("PlotBootstrapSkillScore" in plot_list): @@ -349,7 +352,8 @@ class PostProcessing(RunEnvironment): bootstrap_type=boot_type, bootstrap_method=boot_method) except Exception as e: logging.error(f"Could not create plot PlotBootstrapSkillScore ({boot_type}, {boot_method}) " - f"due to the following error: {e}") + f"due to the following error:\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n" + f"{sys.exc_info()[2]}") except Exception as e: logging.error(f"Could not create plot PlotBootstrapSkillScore due to the following error: {e}") @@ -357,14 +361,16 @@ class PostProcessing(RunEnvironment): if "PlotConditionalQuantiles" in plot_list: PlotConditionalQuantiles(self.test_data.keys(), data_pred_path=path, plot_folder=self.plot_path) except Exception as e: - logging.error(f"Could not create plot PlotConditionalQuantiles due to the following error: {e}") + logging.error(f"Could not create plot PlotConditionalQuantiles due to the following error:" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotMonthlySummary" in plot_list: PlotMonthlySummary(self.test_data.keys(), path, r"forecasts_%s_test.nc", self.target_var, plot_folder=self.plot_path) except Exception as e: - logging.error(f"Could not create plot PlotMonthlySummary due to the following error: {e}") + logging.error(f"Could not create plot PlotMonthlySummary due to the following error:" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotClimatologicalSkillScore" in plot_list: @@ -373,21 +379,24 @@ class PostProcessing(RunEnvironment): PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, score_only=False, extra_name_tag="all_terms_", model_setup=self.forecast_indicator) except Exception as e: - logging.error(f"Could not create plot PlotClimatologicalSkillScore due to the following error: {e}") + logging.error(f"Could not create plot PlotClimatologicalSkillScore due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotCompetitiveSkillScore" in plot_list: PlotCompetitiveSkillScore(self.skill_scores[0], plot_folder=self.plot_path, model_setup=self.forecast_indicator) except Exception as e: - logging.error(f"Could not create plot PlotCompetitiveSkillScore due to the following error: {e}") + logging.error(f"Could not create plot PlotCompetitiveSkillScore due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotTimeSeries" in plot_list: PlotTimeSeries(self.test_data.keys(), path, r"forecasts_%s_test.nc", plot_folder=self.plot_path, - sampling=self._sampling) + sampling=self._sampling, ahead_dim=self.ahead_dim) except Exception as e: - logging.error(f"Could not create plot PlotTimeSeries due to the following error: {e}") + logging.error(f"Could not create plot PlotTimeSeries due to the following error:\n{sys.exc_info()[0]}\n" + f"{sys.exc_info()[1]}\n{sys.exc_info()[2]}\n{traceback.format_exc()}") try: if "PlotStationMap" in plot_list: @@ -404,7 +413,8 @@ class PostProcessing(RunEnvironment): (self.test_data, {"marker": 9, "ms": 9})] PlotStationMap(generators=gens, plot_folder=self.plot_path, plot_name="station_map_var") except Exception as e: - logging.error(f"Could not create plot PlotStationMap due to the following error: {e}") + logging.error(f"Could not create plot PlotStationMap due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotAvailability" in plot_list: @@ -412,7 +422,8 @@ class PostProcessing(RunEnvironment): PlotAvailability(avail_data, plot_folder=self.plot_path, time_dimension=time_dim, window_dimension=window_dim) except Exception as e: - logging.error(f"Could not create plot PlotAvailability due to the following error: {e}") + logging.error(f"Could not create plot PlotAvailability due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotAvailabilityHistogram" in plot_list: @@ -420,7 +431,8 @@ class PostProcessing(RunEnvironment): PlotAvailabilityHistogram(avail_data, plot_folder=self.plot_path, station_dim=iter_dim, history_dim=window_dim) except Exception as e: - logging.error(f"Could not create plot PlotAvailabilityHistogram due to the following error: {e}") + logging.error(f"Could not create plot PlotAvailabilityHistogram due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotPeriodogram" in plot_list: @@ -428,7 +440,8 @@ class PostProcessing(RunEnvironment): variables_dim=target_dim, sampling=self._sampling, use_multiprocessing=use_multiprocessing) except Exception as e: - logging.error(f"Could not create plot PlotPeriodogram due to the following error: {e}") + logging.error(f"Could not create plot PlotPeriodogram due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") try: if "PlotDataHistogram" in plot_list: @@ -437,7 +450,8 @@ class PostProcessing(RunEnvironment): PlotDataHistogram(gens, plot_folder=self.plot_path, time_dim=time_dim, variables_dim=target_dim, upsampling=upsampling) except Exception as e: - logging.error(f"Could not create plot PlotDataHistogram due to the following error: {e}") + logging.error(f"Could not create plot PlotDataHistogram due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") def calculate_test_score(self): """Evaluate test score of model and save locally.""" diff --git a/run_climate_filter.py b/run_climate_filter.py index 4aacab8817b2f6350de861ef383b4777790bc57c..5577375e2fc135676f71151791c1d564dcb25a2e 100755 --- a/run_climate_filter.py +++ b/run_climate_filter.py @@ -5,41 +5,80 @@ import argparse from mlair.workflows import DefaultWorkflow from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSamplingWithClimateFirFilter +from mlair.model_modules.fully_connected_networks import BranchedInputFCN as NN +from mlair.configuration.defaults import DEFAULT_PLOT_LIST, DEFAULT_TRAIN_END -stats = {'o3': 'dma8eu', 'no': 'dma8eu', 'no2': 'dma8eu', - 'relhum': 'average_values', 'u': 'average_values', 'v': 'average_values', - 'cloudcover': 'average_values', 'pblheight': 'maximum', - 'temp': 'maximum'} -data_origin = {'o3': '', 'no': '', 'no2': '', - 'relhum': 'REA', 'u': 'REA', 'v': 'REA', - 'cloudcover': 'REA', 'pblheight': 'REA', - 'temp': 'REA'} + +STATS = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + +DATA_ORIGIN = {"no": "", "no2": "", "o3": "", + "cloudcover": "REA", "pblheight": "REA", "relhum": "REA", + "temp": "REA", "u": "REA", "v": "REA"} def main(parser_args): - args = dict(stations=["DEBW107", "DEBW013"], - network="UBA", - evaluate_bootstraps=False, plot_list=[], - data_origin=data_origin, data_handler=DataHandlerMixedSamplingWithClimateFirFilter, - interpolation_limit=(3, 1), overwrite_local_data=False, - lazy_preprocessing=True, - use_multiprocessing=True, - use_multiprocessing_on_debug=True, - sampling=("hourly", "daily"), - statistics_per_var=stats, - create_new_model=True, train_model=False, epochs=1, - window_history_size=6 * 24 + 16, - window_history_offset=16, - kz_filter_length=[100 * 24, 15 * 24], - kz_filter_iter=[4, 5], - filter_cutoff_period=[7, 0.8], - filter_order=[7, 2], - start="2006-01-01", - train_start="2006-01-01", - end="2011-12-31", - test_end="2011-12-31", - **parser_args.__dict__, - ) + # plots = remove_items(DEFAULT_PLOT_LIST, "PlotConditionalQuantiles") + args = dict( + sampling=("hourly", "daily"), # T1A, T1D + stations=["DEBW107", "DEBW013"], # [:5], # T1B, TODO: remove indexing for meaningful experiments + network="UBA", # T1B + variables=["o3", "no", "no2", "cloudcover", "pblheight", "relhum", "temp", "u", "v"], # T1B + statistics_per_var=STATS, # T1A, T1D, T1F + + data_origin=DATA_ORIGIN, + # data_handler=DataHandlerMixedSampling, + # window_history_offset=16, + # window_history_size=6 * 24 + 16, # T1D + data_handler=DataHandlerMixedSamplingWithClimateFirFilter, + filter_cutoff_period=[21], + filter_order=[42], + filter_window_type=("kaiser", 5), + filter_add_unfiltered=True, + apriori_sel_opts=slice(DEFAULT_TRAIN_END), + apriori_type="residuum_stats", + apriori_diurnal=True, + use_filter_branches=True, + + window_history_size=2 * 24 + 16, + window_history_offset=16, # T1F + window_lead_time=4, # T1D + target_var="o3", # T1D + interpolation_limit=(24, 2), # T1F + transformation={ + "o3": {"method": "log"}, + "no": {"method": "log"}, + "no2": {"method": "log"}, + "cloudcover": {"method": "min_max", "feature_range": [-1, 1]}, + "pblheight": {"method": "log"}, + "relhum": {"method": "min_max", "feature_range": [-1, 1]}, + "temp": {"method": "standardise"}, + "u": {"method": "standardise"}, + "v": {"method": "standardise"}, }, # T1F, also apply same target transformation + + start="2006-01-01", + train_start="2006-01-01", + end="2011-12-31", + test_end="2011-12-31", + + train_model=False, create_new_model=True, + epochs=1, + model=NN, + evaluate_bootstraps=False, + bootstrap_type=["singleinput", "branch", "variable"], + bootstrap_method=["shuffle", "zero_mean"], + plot_list=DEFAULT_PLOT_LIST, + # competitors=["IntelliO3-ts-v1", "MFCN_64_32_16_climFIR", "FCN_1449_512_32_4"], # "FCN_1449_16_8_4",], + lazy_preprocessing=True, + use_multiprocessing=True, + use_multiprocessing_on_debug=False, + overwrite_local_data=False, + overwrite_lazy_data=False, + max_number_multiprocessing=2, + **parser_args.__dict__ + ) + print(parser_args.__dict__) workflow = DefaultWorkflow(**args, start_script=__file__) workflow.run()