From 2ba4fe1a3edb7db4479f5fa083ffc7312f36e99d Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Fri, 12 Mar 2021 16:45:02 +0100 Subject: [PATCH 01/17] first fix to be able to run again --- mlair/data_handler/data_handler_mixed_sampling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index caaa7a62..8159abda 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -204,7 +204,7 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil time_deltas = np.round(self.time_delta(self.cutoff_period)).astype(int) start, end = window, 1 res = [] - window_array = self.create_index_array(self.window_dim.range(start, end), squeeze_dim=self.target_dim) + window_array = self.create_index_array(self.window_dim, range(start, end), squeeze_dim=self.target_dim) for delta, filter_name in zip(np.append(time_deltas, 1), data.coords["filter"]): res_filter = [] data_filter = data.sel({"filter": filter_name}) -- GitLab From de191bedaa41dec1555a492627b06d969a52f172 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Fri, 12 Mar 2021 17:09:09 +0100 Subject: [PATCH 02/17] new req bottleneck improves speed for kzf --- HPC_setup/requirements_HDFML_additionals.txt | 1 + HPC_setup/requirements_JUWELS_additionals.txt | 1 + mlair/helpers/statistics.py | 4 +++- requirements.txt | 1 + requirements_gpu.txt | 1 + 5 files changed, 7 insertions(+), 1 deletion(-) diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index 12e09ccd..26e335d5 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -2,6 +2,7 @@ absl-py==0.11.0 appdirs==1.4.4 astor==0.8.1 attrs==20.3.0 +bottleneck==1.3.2 cached-property==1.5.2 certifi==2020.12.5 cftime==1.4.1 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index 12e09ccd..26e335d5 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -2,6 +2,7 @@ absl-py==0.11.0 appdirs==1.4.4 astor==0.8.1 attrs==20.3.0 +bottleneck==1.3.2 cached-property==1.5.2 certifi==2020.12.5 cftime==1.4.1 diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 3631597a..57d7802e 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -616,9 +616,11 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): wl(int): a window length itr(int): a number of iteration """ + import warnings + warnings.filterwarnings("ignore") df_itr = df.__deepcopy__() try: - kwargs = {"min_periods": 1, + kwargs = {"min_periods": int(0.7 * wl), "center": True, self.filter_dim: wl} iter_vars = df_itr.coords["variables"].values diff --git a/requirements.txt b/requirements.txt index b0a6e7f5..51d6e023 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ absl-py==0.11.0 appdirs==1.4.4 astor==0.8.1 attrs==20.3.0 +bottleneck==1.3.2 cached-property==1.5.2 certifi==2020.12.5 cftime==1.4.1 diff --git a/requirements_gpu.txt b/requirements_gpu.txt index 35fe0d5e..11a5c8ae 100644 --- a/requirements_gpu.txt +++ b/requirements_gpu.txt @@ -2,6 +2,7 @@ absl-py==0.11.0 appdirs==1.4.4 astor==0.8.1 attrs==20.3.0 +bottleneck==1.3.2 cached-property==1.5.2 certifi==2020.12.5 cftime==1.4.1 -- GitLab From 7376d0c9d2fadaaa926e23f6120b34315a826d4f Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Mon, 15 Mar 2021 12:52:16 +0100 Subject: [PATCH 03/17] use he init when using relu activations --- mlair/model_modules/fully_connected_networks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mlair/model_modules/fully_connected_networks.py b/mlair/model_modules/fully_connected_networks.py index 007b8f0d..7108d9a3 100644 --- a/mlair/model_modules/fully_connected_networks.py +++ b/mlair/model_modules/fully_connected_networks.py @@ -67,7 +67,8 @@ class FCN(AbstractModelClass): "sigmoid": partial(keras.layers.Activation, "sigmoid"), "linear": partial(keras.layers.Activation, "linear"), "selu": partial(keras.layers.Activation, "selu")} - _initializer = {"selu": keras.initializers.lecun_normal()} + _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", + "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal()} _optimizer = {"adam": keras.optimizers.adam, "sgd": keras.optimizers.SGD} _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] -- GitLab From 1f13155f000f62c5bc8ab1e2f62c658bf6714bea Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Mon, 15 Mar 2021 14:03:50 +0100 Subject: [PATCH 04/17] added prelu activation, alpha dropout is used for selu act, FCN_64_32_16 is now an inheritance of FCN class --- mlair/model_modules/abstract_model_class.py | 4 +- .../model_modules/fully_connected_networks.py | 100 +++++++----------- 2 files changed, 43 insertions(+), 61 deletions(-) diff --git a/mlair/model_modules/abstract_model_class.py b/mlair/model_modules/abstract_model_class.py index 894ff7ac..989f4578 100644 --- a/mlair/model_modules/abstract_model_class.py +++ b/mlair/model_modules/abstract_model_class.py @@ -82,7 +82,7 @@ class AbstractModelClass(ABC): self.__custom_objects = value @property - def compile_options(self) -> Callable: + def compile_options(self) -> Dict: """ The compile options property allows the user to use all keras.compile() arguments. They can ether be passed as dictionary (1), as attribute, without setting compile_options (2) or as mixture (partly defined as instance @@ -116,7 +116,7 @@ class AbstractModelClass(ABC): def set_compile_options(self): self.optimizer = keras.optimizers.SGD() self.loss = keras.losses.mean_squared_error - self.compile_options = {"optimizer" = keras.optimizers.Adam(), "metrics": ["mse", "mae"]} + self.compile_options = {"optimizer": keras.optimizers.Adam(), "metrics": ["mse", "mae"]} Note: * As long as the attribute and the dict value have exactly the same values, the setter method will not raise diff --git a/mlair/model_modules/fully_connected_networks.py b/mlair/model_modules/fully_connected_networks.py index 7108d9a3..9fb08cdf 100644 --- a/mlair/model_modules/fully_connected_networks.py +++ b/mlair/model_modules/fully_connected_networks.py @@ -10,53 +10,6 @@ from mlair.model_modules.loss import var_loss, custom_loss import keras -class FCN_64_32_16(AbstractModelClass): - """ - A customised model 4 Dense layers (64, 32, 16, window_lead_time), where the last layer is the output layer depending - on the window_lead_time parameter. - """ - - def __init__(self, input_shape: list, output_shape: list): - """ - Sets model and loss depending on the given arguments. - - :param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables)) - :param output_shape: list of output shapes (expect len=1 with shape=(window_forecast)) - """ - - assert len(input_shape) == 1 - assert len(output_shape) == 1 - super().__init__(input_shape[0], output_shape[0]) - - # settings - self.activation = keras.layers.PReLU - - # apply to model - self.set_model() - self.set_compile_options() - self.set_custom_objects(loss=self.compile_options['loss']) - - def set_model(self): - """ - Build the model. - """ - x_input = keras.layers.Input(shape=self._input_shape) - x_in = keras.layers.Flatten()(x_input) - x_in = keras.layers.Dense(64, name="Dense_64")(x_in) - x_in = self.activation()(x_in) - x_in = keras.layers.Dense(32, name="Dense_32")(x_in) - x_in = self.activation()(x_in) - x_in = keras.layers.Dense(16, name="Dense_16")(x_in) - x_in = self.activation()(x_in) - x_in = keras.layers.Dense(self._output_shape, name="Dense_output")(x_in) - out_main = self.activation()(x_in) - self.model = keras.Model(inputs=x_input, outputs=[out_main]) - - def set_compile_options(self): - self.optimizer = keras.optimizers.adam(lr=1e-2) - self.compile_options = {"loss": [keras.losses.mean_squared_error], "metrics": ["mse", "mae"]} - - class FCN(AbstractModelClass): """ A customisable fully connected network (64, 32, 16, window_lead_time), where the last layer is the output layer depending @@ -66,12 +19,15 @@ class FCN(AbstractModelClass): _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), "sigmoid": partial(keras.layers.Activation, "sigmoid"), "linear": partial(keras.layers.Activation, "linear"), - "selu": partial(keras.layers.Activation, "selu")} + "selu": partial(keras.layers.Activation, "selu"), + "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25))} _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", - "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal()} + "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), + "prelu": keras.initializers.he_normal()} _optimizer = {"adam": keras.optimizers.adam, "sgd": keras.optimizers.SGD} _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] + _dropout = {"selu": keras.layers.AlphaDropout} def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear", optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None, @@ -97,12 +53,12 @@ class FCN(AbstractModelClass): self._update_model_name() self.kernel_initializer = self._initializer.get(activation, "glorot_uniform") self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs) - self.dropout = self._set_dropout(dropout) + self.dropout, self.dropout_rate = self._set_dropout(activation, dropout) # apply to model self.set_model() self.set_compile_options() - self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss) + self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) def _set_activation(self, activation): try: @@ -140,12 +96,11 @@ class FCN(AbstractModelClass): except KeyError: raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") - @staticmethod - def _set_dropout(dropout): - if dropout is None: - return dropout - assert 0 <= dropout < 1 - return dropout + def _set_dropout(self, activation, dropout_rate): + if dropout_rate is None: + return None, None + assert 0 <= dropout_rate < 1 + return self._dropout.get(activation, keras.layers.Dropout), dropout_rate def _update_model_name(self): n_input = str(reduce(lambda x, y: x * y, self._input_shape)) @@ -169,7 +124,7 @@ class FCN(AbstractModelClass): kernel_regularizer=self.kernel_regularizer)(x_in) x_in = self.activation(name=f"{self.activation_name}_{layer + 1}")(x_in) if self.dropout is not None: - x_in = keras.layers.Dropout(self.dropout)(x_in) + x_in = self.dropout(self.dropout_rate)(x_in) else: assert isinstance(self.layer_configuration, list) is True for layer, n_hidden in enumerate(self.layer_configuration): @@ -177,7 +132,7 @@ class FCN(AbstractModelClass): kernel_regularizer=self.kernel_regularizer)(x_in) x_in = self.activation(name=f"{self.activation_name}_{layer + 1}")(x_in) if self.dropout is not None: - x_in = keras.layers.Dropout(self.dropout)(x_in) + x_in = self.dropout(self.dropout_rate)(x_in) x_in = keras.layers.Dense(self._output_shape)(x_in) out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) self.model = keras.Model(inputs=x_input, outputs=[out]) @@ -185,3 +140,30 @@ class FCN(AbstractModelClass): def set_compile_options(self): self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], "metrics": ["mse", "mae", var_loss]} + + +class FCN_64_32_16(FCN): + """ + A customised model 4 Dense layers (64, 32, 16, window_lead_time), where the last layer is the output layer depending + on the window_lead_time parameter. + """ + + _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"] + + def __init__(self, input_shape: list, output_shape: list, **kwargs): + """ + Sets model and loss depending on the given arguments. + + :param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables)) + :param output_shape: list of output shapes (expect len=1 with shape=(window_forecast)) + """ + lr = kwargs.pop("lr", 1e-2) + super().__init__(input_shape, output_shape, activation="prelu", activation_output="linear", + layer_configuration=[64, 32, 16], optimizer="adam", lr=lr, **kwargs) + + def set_compile_options(self): + self.compile_options = {"loss": [keras.losses.mean_squared_error], "metrics": ["mse", "mae"]} + + def _update_model_name(self): + self.model_name = "FCN" + super()._update_model_name() -- GitLab From 96942e6e3ac9875ce8b78ab8cb999be85e1e4918 Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Tue, 16 Mar 2021 09:41:42 +0000 Subject: [PATCH 05/17] try to fix six error --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b0a6e7f5..c4b281bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,7 +51,7 @@ PyYAML==5.4.1 requests==2.25.1 scipy==1.5.4 seaborn==0.11.1 -six==1.15.0 +--ignore-installed six==1.15.0 statsmodels==0.12.2 tabulate==0.8.8 tensorboard==1.13.1 -- GitLab From 7aefc11fbe46e5807693ebdf45dfbc6fea96e569 Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Tue, 16 Mar 2021 09:46:14 +0000 Subject: [PATCH 06/17] change six installation --- .gitlab-ci.yml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f4d042f0..eacbe3e2 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -42,7 +42,7 @@ tests (from scratch): - ./CI/update_badge.sh > /dev/null script: - pip install --upgrade pip - - pip install numpy wheel six + - pip install numpy wheel six==1.15.0 - zypper --non-interactive install binutils libproj-devel gdal-devel - zypper --non-interactive install proj geos-devel # - cat requirements.txt | cut -f1 -d"#" | sed '/^\s*$/d' | xargs -L 1 pip install diff --git a/requirements.txt b/requirements.txt index c4b281bb..b0a6e7f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,7 +51,7 @@ PyYAML==5.4.1 requests==2.25.1 scipy==1.5.4 seaborn==0.11.1 ---ignore-installed six==1.15.0 +six==1.15.0 statsmodels==0.12.2 tabulate==0.8.8 tensorboard==1.13.1 -- GitLab From 614866546c57b9ac85eb9b173066dd703ba711e1 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 16 Mar 2021 14:44:24 +0100 Subject: [PATCH 07/17] kzf per variable seems to be faster than over variables, check on HPC --- .../data_handler_mixed_sampling.py | 6 +++ mlair/helpers/statistics.py | 48 ++++++++++++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 8159abda..c56499dc 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -114,6 +114,12 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi self._data = list(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data self.set_inputs_and_targets() self.apply_kz_filter() + # lazy data loading on first time if possible + # * store the kz data locally in data path under different folder /e.g. kzf_data + # * create a checksum for the name and reuse this data always if checksum fits (this will replace all previous + # steps and save a lot of computation time. + # lazy create of subsets by reusing as much as possible + # * start here when using preprocessed data, select new start and end if self.do_transformation is True: self.call_transform() self.make_samples() diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 57d7802e..0b73bc27 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -11,8 +11,10 @@ import pandas as pd from typing import Union, Tuple, Dict, List from matplotlib import pyplot as plt import itertools +import gc +import warnings -from mlair.helpers import to_list +from mlair.helpers import to_list, TimeTracking, TimeTrackingWrapper Data = Union[xr.DataArray, pd.DataFrame] @@ -608,6 +610,48 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): else: return None + @TimeTrackingWrapper + def kz_filter_new(self, df, wl, itr): + """ + It passes the low frequency time series. + + If filter method is from mean, max, min this method will call construct and rechunk before the actual + calculation to improve performance. If filter method is either median or percentile this approach is not + applicable and depending on the data and window size, this method can become slow. + + Args: + wl(int): a window length + itr(int): a number of iteration + """ + warnings.filterwarnings("ignore") + df_itr = df.__deepcopy__() + try: + kwargs = {"min_periods": int(0.7 * wl), + "center": True, + self.filter_dim: wl} + for i in np.arange(0, itr): + print(i) + rolling = df_itr.chunk().rolling(**kwargs) + if self.method not in ["percentile", "median"]: + rolling = rolling.construct("construct").chunk("auto") + if self.method == "median": + df_mv_avg_tmp = rolling.median() + elif self.method == "percentile": + df_mv_avg_tmp = rolling.quantile(self.percentile) + elif self.method == "max": + df_mv_avg_tmp = rolling.max("construct") + elif self.method == "min": + df_mv_avg_tmp = rolling.min("construct") + else: + df_mv_avg_tmp = rolling.mean("construct") + df_itr = df_mv_avg_tmp.compute() + del df_mv_avg_tmp, rolling + gc.collect() + return df_itr + except ValueError: + raise ValueError + + @TimeTrackingWrapper def kz_filter(self, df, wl, itr): """ It passes the low frequency time series. @@ -639,7 +683,7 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): else: df_mv_avg_tmp = rolling.mean() df_itr_var = df_mv_avg_tmp.compute() - df_itr = df_itr.drop_sel(variables=var).combine_first(df_itr_var) + df_itr.loc[{"variables": [var]}] = df_itr_var return df_itr except ValueError: raise ValueError -- GitLab From faaa3388db7ba50ab7627de6537d4c23efad7e6d Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 16 Mar 2021 17:39:00 +0100 Subject: [PATCH 08/17] single compute call was missing --- mlair/data_handler/data_handler_mixed_sampling.py | 2 +- mlair/helpers/statistics.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index c56499dc..c62e18f2 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -218,7 +218,7 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil res_filter.append(data_filter.shift({dim: -w * delta})) res_filter = xr.concat(res_filter, dim=window_array).chunk() res.append(res_filter) - res = xr.concat(res, dim="filter") + res = xr.concat(res, dim="filter").compute() return res def estimate_filter_width(self): diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 0b73bc27..a8ba9795 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -669,8 +669,9 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): self.filter_dim: wl} iter_vars = df_itr.coords["variables"].values for var in iter_vars: - df_itr_var = df_itr.sel(variables=[var]).chunk() + df_itr_var = df_itr.sel(variables=[var]) for _ in np.arange(0, itr): + df_itr_var = df_itr_var.chunk() rolling = df_itr_var.rolling(**kwargs) if self.method == "median": df_mv_avg_tmp = rolling.median() -- GitLab From 891e208f7b0d56da314e3cc7a3ef275dcb0ac2ae Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 16 Mar 2021 20:10:00 +0100 Subject: [PATCH 09/17] assure that concat data has no nans --- mlair/helpers/statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index a8ba9795..3e99357c 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -440,7 +440,7 @@ class SkillScores: """Calculate CASE IV.""" AI, BI, CI, data, suffix = self.skill_score_pre_calculations(internal_data, observation_name, forecast_name) monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, index=data.index) - data = xr.concat([data, monthly_mean_external], dim="type") + data = xr.concat([data, monthly_mean_external], dim="type").dropna(dim="index") mean, sigma = suffix["mean"], suffix["sigma"] mean_external = monthly_mean_external.mean() sigma_external = np.sqrt(monthly_mean_external.var()) -- GitLab From b10bca2b5a4db56d1e5f9b36a9cfbe7fb094ece6 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 10:07:19 +0100 Subject: [PATCH 10/17] data handler single station has now a submethod make_input_target --- mlair/data_handler/data_handler_kz_filter.py | 9 ++---- .../data_handler_mixed_sampling.py | 15 ++-------- .../data_handler_single_station.py | 30 ++++++++++++++++--- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py index 78638a13..face8f3c 100644 --- a/mlair/data_handler/data_handler_kz_filter.py +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -38,10 +38,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): def _check_sampling(self, **kwargs): assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution - def setup_samples(self): - """ - Setup samples. This method prepares and creates samples X, and labels Y. - """ + def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, self.station_type, self.network, self.store_data_locally, self.data_origin) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, @@ -54,9 +51,6 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): # import matplotlib.pyplot as plt # self.input_data.sel(filter="74d", variables="temp", Stations="DEBW107").plot() # self.input_data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter") - if self.do_transformation is True: - self.call_transform() - self.make_samples() @TimeTrackingWrapper def apply_kz_filter(self): @@ -88,6 +82,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): return self.history.transpose(self.time_dim, self.window_dim, self.iter_dim, self.target_dim, self.filter_dim).copy() + class DataHandlerKzFilter(DefaultDataHandler): """Data handler using kz filtered data.""" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index caaa7a62..ebcfbb42 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -54,15 +54,9 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): assert len(parameter) == 2 # (inputs, targets) kwargs.update({parameter_name: parameter}) - def setup_samples(self): - """ - Setup samples. This method prepares and creates samples X, and labels Y. - """ + def make_input_target(self): self._data = list(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data self.set_inputs_and_targets() - if self.do_transformation is True: - self.call_transform() - self.make_samples() def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: vars = [self.variables, self.target_var] @@ -104,19 +98,14 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi def _check_sampling(self, **kwargs): assert kwargs.get("sampling") == ("hourly", "daily") - def setup_samples(self): + def make_input_target(self): """ - Setup samples. This method prepares and creates samples X, and labels Y. - A KZ filter is applied on the input data that has hourly resolution. Lables Y are provided as aggregated values with daily resolution. """ self._data = list(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data self.set_inputs_and_targets() self.apply_kz_filter() - if self.do_transformation is True: - self.call_transform() - self.make_samples() def estimate_filter_width(self): """ diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index a894c635..820e601f 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -5,6 +5,7 @@ __date__ = '2020-07-20' import copy import datetime as dt +import hashlib import logging import os from functools import reduce @@ -54,10 +55,16 @@ class DataHandlerSingleStation(AbstractDataHandler): interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT, interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, - min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, **kwargs): + min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, + lazy_loading: bool = False, **kwargs): super().__init__() self.station = helpers.to_list(station) self.path = self.setup_data_path(data_path, sampling) + self.lazy = lazy_loading + self.lazy_path = None + if self.lazy is True: + self.lazy_path = os.path.join(data_path, "lazy_data", self.__class__.__name__) + check_path_and_create(self.lazy_path) self.statistics_per_var = statistics_per_var self.data_origin = data_origin self.do_transformation = transformation is not None @@ -94,6 +101,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.observation = None # create samples + # self.hash() self.setup_samples() def __str__(self): @@ -215,15 +223,18 @@ class DataHandlerSingleStation(AbstractDataHandler): """ Setup samples. This method prepares and creates samples X, and labels Y. """ + self.make_input_target() + if self.do_transformation is True: + self.call_transform() + self.make_samples() + + def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, self.station_type, self.network, self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() - if self.do_transformation is True: - self.call_transform() - self.make_samples() def set_inputs_and_targets(self): inputs = self._data.sel({self.target_dim: helpers.to_list(self.variables)}) @@ -658,6 +669,17 @@ class DataHandlerSingleStation(AbstractDataHandler): return self.transform(data, dim=dim, opts=self._transformation[pos], inverse=inverse, transformation_dim=self.target_dim) + def _get_hash(self): + hash_list = [self.station, self.statistics_per_var, self.data_origin, self.station_type, self.network, + self.sampling, self.target_dim, self.target_var, self.time_dim, self.iter_dim, self.window_dim, + self.window_history_size, self.window_history_offset, self.window_lead_time, + self.interpolation_limit, self.interpolation_method, self.min_length, self.start, self.end] + + hash = "".join([str(e) for e in hash_list]).encode("utf-8") + m = hashlib.sha256() + m.update(hash) + return m.hexdigest() + if __name__ == "__main__": # dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) -- GitLab From 3a3cc762fa0e8616aaf879e24532895304f34298 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 15:35:09 +0100 Subject: [PATCH 11/17] can create a hash from all important parameters, lazy loading works for all data handlers --- HPC_setup/requirements_HDFML_additionals.txt | 1 + HPC_setup/requirements_JUWELS_additionals.txt | 1 + mlair/data_handler/abstract_data_handler.py | 3 ++ mlair/data_handler/data_handler_kz_filter.py | 1 + .../data_handler_mixed_sampling.py | 37 +++++++++++-- .../data_handler_single_station.py | 54 ++++++++++++++----- requirements.txt | 1 + requirements_gpu.txt | 1 + .../test_data_handler_mixed_sampling.py | 2 +- 9 files changed, 85 insertions(+), 16 deletions(-) diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index 12e09ccd..7d6163a6 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -9,6 +9,7 @@ chardet==4.0.0 coverage==5.4 cycler==0.10.0 dask==2021.2.0 +dill==0.3.3 fsspec==0.8.5 gast==0.4.0 grpcio==1.35.0 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index 12e09ccd..7d6163a6 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -9,6 +9,7 @@ chardet==4.0.0 coverage==5.4 cycler==0.10.0 dask==2021.2.0 +dill==0.3.3 fsspec==0.8.5 gast==0.4.0 grpcio==1.35.0 diff --git a/mlair/data_handler/abstract_data_handler.py b/mlair/data_handler/abstract_data_handler.py index f085d18b..419db059 100644 --- a/mlair/data_handler/abstract_data_handler.py +++ b/mlair/data_handler/abstract_data_handler.py @@ -55,3 +55,6 @@ class AbstractDataHandler: def get_coordinates(self) -> Union[None, Dict]: """Return coordinates as dictionary with keys `lon` and `lat`.""" return None + + def _hash_list(self): + return [] diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py index face8f3c..1ff1a36f 100644 --- a/mlair/data_handler/data_handler_kz_filter.py +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -22,6 +22,7 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): """Data handler for a single station to be used by a superior data handler. Inputs are kz filtered.""" _requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"]) + _hash = DataHandlerSingleStation._hash + ["kz_filter_length", "kz_filter_iter", "filter_dim"] DEFAULT_FILTER_DIM = "filter" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index ebcfbb42..acb62df9 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -12,6 +12,10 @@ import inspect from typing import Callable import datetime as dt from typing import Any +import os +import dill +import logging +from functools import partial import numpy as np import pandas as pd @@ -77,6 +81,12 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): assert len(sampling) == 2 return list(map(lambda x: super(__class__, self).setup_data_path(data_path, x), sampling)) + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data = lazy_data + f_prep = partial(self._slice_prep, start=self.start, end=self.end) + self._data = f_prep(_data[0]), f_prep(_data[1]) + self.input_data, self.target_data = list(map(f_prep, [_input_data, _target_data])) + class DataHandlerMixedSampling(DefaultDataHandler): """Data handler using mixed sampling for input and target.""" @@ -119,14 +129,24 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi new_date = dt.datetime.strptime(date, "%Y-%m-%d") + dt.timedelta(hours=delta) return new_date.strftime("%Y-%m-%d") - def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: - + def update_start_end(self, ind): if ind == 0: # for inputs estimated_filter_width = self.estimate_filter_width() start = self._add_time_delta(self.start, -estimated_filter_width) end = self._add_time_delta(self.end, estimated_filter_width) else: # target start, end = self.start, self.end + return start, end + + def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: + + start, end = self.update_start_end(ind) + # if ind == 0: # for inputs + # estimated_filter_width = self.estimate_filter_width() + # start = self._add_time_delta(self.start, -estimated_filter_width) + # end = self._add_time_delta(self.end, estimated_filter_width) + # else: # target + # start, end = self.start, self.end vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) @@ -138,6 +158,16 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi limit=self.interpolation_limit[ind]) return data + def _create_lazy_data(self): + return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] + + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data + start_inp, end_inp = self.update_start_end(0) + self._data = list(map(self._slice_prep, _data, [start_inp, self.start], [end_inp, self.end])) + self.input_data = self._slice_prep(_input_data, start_inp, end_inp) + self.target_data = self._slice_prep(_target_data, self.start, self.end) + class DataHandlerMixedSamplingWithFilter(DefaultDataHandler): """Data handler using mixed sampling for input and target. Inputs are temporal filtered.""" @@ -158,6 +188,7 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil """ _requirements = DataHandlerMixedSamplingWithFilterSingleStation.requirements() + _hash = DataHandlerMixedSamplingWithFilterSingleStation._hash + ["time_delta"] def __init__(self, *args, time_delta=np.sqrt, **kwargs): assert isinstance(time_delta, Callable) @@ -193,7 +224,7 @@ class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithFil time_deltas = np.round(self.time_delta(self.cutoff_period)).astype(int) start, end = window, 1 res = [] - window_array = self.create_index_array(self.window_dim.range(start, end), squeeze_dim=self.target_dim) + window_array = self.create_index_array(self.window_dim, range(start, end), squeeze_dim=self.target_dim) for delta, filter_name in zip(np.append(time_deltas, 1), data.coords["filter"]): res_filter = [] data_filter = data.sel({"filter": filter_name}) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 820e601f..a8c6ea2e 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -5,10 +5,11 @@ __date__ = '2020-07-20' import copy import datetime as dt +import dill import hashlib import logging import os -from functools import reduce +from functools import reduce, partial from typing import Union, List, Iterable, Tuple, Dict, Optional import numpy as np @@ -46,6 +47,10 @@ class DataHandlerSingleStation(AbstractDataHandler): DEFAULT_INTERPOLATION_LIMIT = 0 DEFAULT_INTERPOLATION_METHOD = "linear" + _hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim", + "target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", + "window_lead_time", "interpolation_limit", "interpolation_method"] + def __init__(self, station, data_path, statistics_per_var, station_type=DEFAULT_STATION_TYPE, network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, @@ -101,7 +106,6 @@ class DataHandlerSingleStation(AbstractDataHandler): self.observation = None # create samples - # self.hash() self.setup_samples() def __str__(self): @@ -223,11 +227,41 @@ class DataHandlerSingleStation(AbstractDataHandler): """ Setup samples. This method prepares and creates samples X, and labels Y. """ - self.make_input_target() + if self.lazy is False: + self.make_input_target() + else: + self.load_lazy() + self.store_lazy() if self.do_transformation is True: self.call_transform() self.make_samples() + def store_lazy(self): + hash = self._get_hash() + filename = os.path.join(self.lazy_path, hash + ".pickle") + if not os.path.exists(filename): + dill.dump(self._create_lazy_data(), file=open(filename, "wb")) + + def _create_lazy_data(self): + return [self._data, self.meta, self.input_data, self.target_data] + + def load_lazy(self): + hash = self._get_hash() + filename = os.path.join(self.lazy_path, hash + ".pickle") + try: + with open(filename, "rb") as pickle_file: + lazy_data = dill.load(pickle_file) + self._extract_lazy(lazy_data) + logging.info("<<<loaded lazy file") + except FileNotFoundError: + logging.info(">>>could not load lazy file") + self.make_input_target() + + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data = lazy_data + f_prep = partial(self._slice_prep, start=self.start, end=self.end) + self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) + def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, self.station_type, self.network, self.store_data_locally, self.data_origin, @@ -669,16 +703,12 @@ class DataHandlerSingleStation(AbstractDataHandler): return self.transform(data, dim=dim, opts=self._transformation[pos], inverse=inverse, transformation_dim=self.target_dim) + def _hash_list(self): + return sorted(list(set(self._hash))) + def _get_hash(self): - hash_list = [self.station, self.statistics_per_var, self.data_origin, self.station_type, self.network, - self.sampling, self.target_dim, self.target_var, self.time_dim, self.iter_dim, self.window_dim, - self.window_history_size, self.window_history_offset, self.window_lead_time, - self.interpolation_limit, self.interpolation_method, self.min_length, self.start, self.end] - - hash = "".join([str(e) for e in hash_list]).encode("utf-8") - m = hashlib.sha256() - m.update(hash) - return m.hexdigest() + hash = "".join([str(self.__getattribute__(e)) for e in self._hash_list()]).encode() + return hashlib.md5(hash).hexdigest() if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index b0a6e7f5..af742fde 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ chardet==4.0.0 coverage==5.4 cycler==0.10.0 dask==2021.2.0 +dill==0.3.3 fsspec==0.8.5 gast==0.4.0 grpcio==1.35.0 diff --git a/requirements_gpu.txt b/requirements_gpu.txt index 35fe0d5e..7dd443a4 100644 --- a/requirements_gpu.txt +++ b/requirements_gpu.txt @@ -9,6 +9,7 @@ chardet==4.0.0 coverage==5.4 cycler==0.10.0 dask==2021.2.0 +dill==0.3.3 fsspec==0.8.5 gast==0.4.0 grpcio==1.35.0 diff --git a/test/test_data_handler/test_data_handler_mixed_sampling.py b/test/test_data_handler/test_data_handler_mixed_sampling.py index d2f9ce00..2a6553b7 100644 --- a/test/test_data_handler/test_data_handler_mixed_sampling.py +++ b/test/test_data_handler/test_data_handler_mixed_sampling.py @@ -37,7 +37,7 @@ class TestDataHandlerMixedSamplingSingleStation: req = object.__new__(DataHandlerSingleStation) assert sorted(obj._requirements) == sorted(remove_items(req.requirements(), "station")) - @mock.patch("mlair.data_handler.data_handler_mixed_sampling.DataHandlerMixedSamplingSingleStation.setup_samples") + @mock.patch("mlair.data_handler.data_handler_single_station.DataHandlerSingleStation.setup_samples") def test_init(self, mock_super_init): obj = DataHandlerMixedSamplingSingleStation("first_arg", "second", {}, test=23, sampling="hourly", interpolation_limit=(1, 10)) -- GitLab From b5f3f9ec817085473a0f65c30d22c75ff2fc30f8 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 15:58:38 +0100 Subject: [PATCH 12/17] renamed lazy_loading to lazy_preprocessing --- mlair/data_handler/data_handler_kz_filter.py | 9 +++++++++ mlair/data_handler/data_handler_mixed_sampling.py | 6 ------ mlair/data_handler/data_handler_single_station.py | 6 ++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py index 1ff1a36f..1f2c63e5 100644 --- a/mlair/data_handler/data_handler_kz_filter.py +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -8,6 +8,7 @@ import numpy as np import pandas as pd import xarray as xr from typing import List, Union +from functools import partial from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.data_handler import DefaultDataHandler @@ -83,6 +84,14 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): return self.history.transpose(self.time_dim, self.window_dim, self.iter_dim, self.target_dim, self.filter_dim).copy() + def _create_lazy_data(self): + return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] + + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data + f_prep = partial(self._slice_prep, start=self.start, end=self.end) + self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) + class DataHandlerKzFilter(DefaultDataHandler): """Data handler using kz filtered data.""" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index acb62df9..b359a26d 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -12,9 +12,6 @@ import inspect from typing import Callable import datetime as dt from typing import Any -import os -import dill -import logging from functools import partial import numpy as np @@ -158,9 +155,6 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi limit=self.interpolation_limit[ind]) return data - def _create_lazy_data(self): - return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] - def _extract_lazy(self, lazy_data): _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data start_inp, end_inp = self.update_start_end(0) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index a8c6ea2e..0497bee0 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -61,11 +61,11 @@ class DataHandlerSingleStation(AbstractDataHandler): interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, - lazy_loading: bool = False, **kwargs): + lazy_preprocessing: bool = False, **kwargs): super().__init__() self.station = helpers.to_list(station) self.path = self.setup_data_path(data_path, sampling) - self.lazy = lazy_loading + self.lazy = lazy_preprocessing self.lazy_path = None if self.lazy is True: self.lazy_path = os.path.join(data_path, "lazy_data", self.__class__.__name__) @@ -252,9 +252,7 @@ class DataHandlerSingleStation(AbstractDataHandler): with open(filename, "rb") as pickle_file: lazy_data = dill.load(pickle_file) self._extract_lazy(lazy_data) - logging.info("<<<loaded lazy file") except FileNotFoundError: - logging.info(">>>could not load lazy file") self.make_input_target() def _extract_lazy(self, lazy_data): -- GitLab From f9c10fe3065a696dcdca91ac8193afe8640b53ad Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 16:06:24 +0100 Subject: [PATCH 13/17] replace all pickle calls with dill calls --- mlair/data_handler/default_data_handler.py | 5 +++-- mlair/data_handler/iterator.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index ddf276cf..07a866ae 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -8,6 +8,7 @@ import gc import logging import os import pickle +import dill import shutil from functools import reduce from typing import Tuple, Union, List @@ -86,7 +87,7 @@ class DefaultDataHandler(AbstractDataHandler): data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme} data = self._force_dask_computation(data) with open(self._save_file, "wb") as f: - pickle.dump(data, f) + dill.dump(data, f) logging.debug(f"save pickle data to {self._save_file}") self._reset_data() @@ -101,7 +102,7 @@ class DefaultDataHandler(AbstractDataHandler): def _load(self): try: with open(self._save_file, "rb") as f: - data = pickle.load(f) + data = dill.load(f) logging.debug(f"load pickle data from {self._save_file}") self._X, self._Y = data["X"], data["Y"] self._X_extreme, self._Y_extreme = data["X_extreme"], data["Y_extreme"] diff --git a/mlair/data_handler/iterator.py b/mlair/data_handler/iterator.py index 30c45417..564bf3bf 100644 --- a/mlair/data_handler/iterator.py +++ b/mlair/data_handler/iterator.py @@ -9,6 +9,7 @@ import math import os import shutil import pickle +import dill from typing import Tuple, List @@ -109,7 +110,7 @@ class KerasIterator(keras.utils.Sequence): """Load pickle data from disk.""" file = self._path % index with open(file, "rb") as f: - data = pickle.load(f) + data = dill.load(f) return data["X"], data["Y"] @staticmethod @@ -167,7 +168,7 @@ class KerasIterator(keras.utils.Sequence): data = {"X": X, "Y": Y} file = self._path % index with open(file, "wb") as f: - pickle.dump(data, f) + dill.dump(data, f) def _get_number_of_mini_batches(self, number_of_samples: int) -> int: """Return number of mini batches as the floored ration of number of samples to batch size.""" -- GitLab From 32f3ff2203d47fea8ad1c2df7328506a7e5cd058 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 18:55:22 +0100 Subject: [PATCH 14/17] data handlers with filters will create negative values, which is incompatible with log transformation. standardization will be used in this cases --- mlair/data_handler/data_handler_kz_filter.py | 15 ++++++++++++++- mlair/data_handler/data_handler_mixed_sampling.py | 2 +- mlair/data_handler/data_handler_single_station.py | 5 +++-- mlair/data_handler/default_data_handler.py | 4 +++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py index 1f2c63e5..539712b3 100644 --- a/mlair/data_handler/data_handler_kz_filter.py +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -7,7 +7,7 @@ import inspect import numpy as np import pandas as pd import xarray as xr -from typing import List, Union +from typing import List, Union, Tuple, Optional from functools import partial from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation @@ -37,6 +37,19 @@ class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): self.cutoff_period_days = None super().__init__(*args, **kwargs) + def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: + """ + Adjust setup of transformation because kfz filtered data will have negative values which is not compatible with + the log transformation. Therefore, replace all log transformation methods by a default standardization. This is + only applied on input side. + """ + transformation = super(__class__, self).setup_transformation(transformation) + if transformation[0] is not None: + for k, v in transformation[0].items(): + if v["method"] == "log": + transformation[0][k]["method"] = "standardise" + return transformation + def _check_sampling(self, **kwargs): assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 86e6f856..75e9e645 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -158,7 +158,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi def _extract_lazy(self, lazy_data): _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data start_inp, end_inp = self.update_start_end(0) - self._data = list(map(self._slice_prep, _data, [start_inp, self.start], [end_inp, self.end])) + self._data = list(map(lambda x: self._slice_prep(_data[x], *self.update_start_end(x)), [0, 1])) self.input_data = self._slice_prep(_input_data, start_inp, end_inp) self.target_data = self._slice_prep(_target_data, self.start, self.end) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 0497bee0..19ff6fa1 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -252,7 +252,9 @@ class DataHandlerSingleStation(AbstractDataHandler): with open(filename, "rb") as pickle_file: lazy_data = dill.load(pickle_file) self._extract_lazy(lazy_data) + logging.info(f"{self.station}: used lazy data") except FileNotFoundError: + logging.info(f"{self.station}: could not use lazy data") self.make_input_target() def _extract_lazy(self, lazy_data): @@ -594,8 +596,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ return data.loc[{coord: slice(str(start), str(end))}] - @staticmethod - def setup_transformation(transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: + def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: """ Set up transformation by extracting all relevant information. diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 07a866ae..5eb6fd02 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -273,7 +273,9 @@ class DefaultDataHandler(AbstractDataHandler): if var not in transformation_dict[i].keys(): transformation_dict[i][var] = {} opts = transformation[var] - assert transformation_dict[i][var].get("method", opts["method"]) == opts["method"] + if not transformation_dict[i][var].get("method", opts["method"]) == opts["method"]: + # data handlers with filters are allowed to change transformation method to standardise + assert hasattr(dh, "filter_dim") and opts["method"] == "standardise" transformation_dict[i][var]["method"] = opts["method"] for k in ["mean", "std", "min", "max"]: old = transformation_dict[i][var].get(k, None) -- GitLab From a72c04dc95e4532d5217914d494453544aeeb7f7 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 17 Mar 2021 19:51:43 +0100 Subject: [PATCH 15/17] corrected get statement, plot still not working --- mlair/run_modules/post_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 3b9b5634..73aebb00 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -306,7 +306,7 @@ class PostProcessing(RunEnvironment): try: if ("filter" in self.test_data[0].get_X(as_numpy=False)[0].coords) and ( "PlotSeparationOfScales" in plot_list): - filter_dim = self.data_store.get("filter_dim", None) + filter_dim = self.data_store.get_default("filter_dim", None) PlotSeparationOfScales(self.test_data, plot_folder=self.plot_path, time_dim=time_dim, window_dim=window_dim, target_dim=target_dim, **{"filter_dim": filter_dim}) except Exception as e: -- GitLab From 145b3d7dce710d583a9ffe7078630b681d6f0e64 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Thu, 18 Mar 2021 09:43:51 +0100 Subject: [PATCH 16/17] load lazy now just drops a debug message, /close #290 --- mlair/data_handler/data_handler_single_station.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 19ff6fa1..0c83e625 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -252,9 +252,9 @@ class DataHandlerSingleStation(AbstractDataHandler): with open(filename, "rb") as pickle_file: lazy_data = dill.load(pickle_file) self._extract_lazy(lazy_data) - logging.info(f"{self.station}: used lazy data") + logging.debug(f"{self.station[0]}: used lazy data") except FileNotFoundError: - logging.info(f"{self.station}: could not use lazy data") + logging.debug(f"{self.station[0]}: could not use lazy data") self.make_input_target() def _extract_lazy(self, lazy_data): -- GitLab From 0f53dffaa0b7c8eb08fb7040b1375423732cb883 Mon Sep 17 00:00:00 2001 From: Felix Kleinert <f.kleinert@fz-juelich.de> Date: Tue, 23 Mar 2021 17:07:17 +0100 Subject: [PATCH 17/17] update transform methods to proplery work when external transformation parameetrs are preovided --- mlair/data_handler/data_handler_single_station.py | 12 +++++++++++- mlair/data_handler/default_data_handler.py | 12 ++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 0c83e625..e9db27a9 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -195,7 +195,17 @@ class DataHandlerSingleStation(AbstractDataHandler): else: raise NotImplementedError - def f_apply(data, method, mean=None, std=None, min=None, max=None): + def f_apply(data, method, **kwargs): + for k, v in kwargs.items(): + if not (isinstance(v, xr.DataArray) or v is None): + _, opts = statistics.min_max(data, dim) + helper = xr.ones_like(opts['min']) + kwargs[k] = helper * v + mean = kwargs.pop('mean', None) + std = kwargs.pop('std', None) + min = kwargs.pop('min', None) + max = kwargs.pop('max', None) + if method == "standardise": return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method} elif method == "centre": diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 5eb6fd02..2eceff32 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -241,6 +241,8 @@ class DefaultDataHandler(AbstractDataHandler): * standardise (default, if method is not given) * centre + * min_max + * log ### mean and std estimation @@ -256,14 +258,16 @@ class DefaultDataHandler(AbstractDataHandler): If mean and std are not None, the default data handler expects this parameters to match the data and applies this values to the data. Make sure that all dimensions and/or coordinates are in agreement. + + ### min and max given + If min and max are not None, the default data handler expects this parameters to match the data and applies + this values to the data. Make sure that all dimensions and/or coordinates are in agreement. """ sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} - transformation_dict = sp_keys.get("transformation", None) - if transformation_dict is None: + if "transformation" not in sp_keys.keys(): return - if isinstance(transformation_dict, dict): # tuple for (input, target) transformation - transformation_dict = copy.deepcopy(transformation_dict), copy.deepcopy(transformation_dict) + transformation_dict = ({}, {}) def _inner(): """Inner method that is performed in both serial and parallel approach.""" -- GitLab