diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 785eb7dffff28a676342feace519a6db0871c1df..53cfa4cad4ade0f5ed988a8598fb8f4fe70a1779 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -4,6 +4,7 @@ __author__ = 'Lukas Leufen' __date__ = '2020-08-26' import inspect +import copy import numpy as np import pandas as pd import xarray as xr @@ -12,7 +13,7 @@ from functools import partial import logging from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.data_handler import DefaultDataHandler -from mlair.helpers import remove_items, to_list, TimeTrackingWrapper +from mlair.helpers import remove_items, to_list, TimeTrackingWrapper, statistics from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf @@ -162,6 +163,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): self.filter_order = self._prepare_filter_order(filter_order, removed_index, self.fs) self.filter_window_type = filter_window_type self._add_unfiltered = filter_add_unfiltered + self.unfiltered_name = "unfiltered" super().__init__(*args, **kwargs) @staticmethod @@ -250,7 +252,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): index.append(f"band{band_num}") band_num += 1 if self._add_unfiltered: - index.append("unfiltered") + index.append(self.unfiltered_name) self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) @@ -261,6 +263,97 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): _data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data super(__class__, self)._extract_lazy((_data, _meta, _input_data, _target_data)) + def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: + """ + Adjust setup of transformation because filtered data will have negative values which is not compatible with + the log transformation. Therefore, replace all log transformation methods by a default standardization. This is + only applied on input side. + """ + transformation = DataHandlerSingleStation.setup_transformation(self, transformation) + if transformation[0] is not None: + unfiltered_option = lambda x: f"{x}/standardise" if self._add_unfiltered is True else "standardise" + for k, v in transformation[0].items(): + if v["method"] in ["log", "min_max"]: + transformation[0][k]["method"] = unfiltered_option(v["method"]) + return transformation + + def transform(self, data_in, dim: Union[str, int] = 0, inverse: bool = False, opts=None, + transformation_dim=None): + """ + Transform data according to given transformation settings. + + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 + and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale + (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This + method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the + internal transform method, internal mean and internal standard deviation weren't set ('inverse=True'). + + :param string/int dim: This param is not used for inverse transformation. + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :param inverse: Switch between transformation and inverse transformation. + + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + + if transformation_dim is None: + transformation_dim = self.DEFAULT_TARGET_DIM + + def f(data, method="standardise", feature_range=None): + if method == "standardise": + return statistics.standardise(data, dim) + elif method == "centre": + return statistics.centre(data, dim) + elif method == "min_max": + kwargs = {"feature_range": feature_range} if feature_range is not None else {} + return statistics.min_max(data, dim, **kwargs) + elif method == "log": + return statistics.log(data, dim) + else: + raise NotImplementedError + + def f_apply(data, method, **kwargs): + for k, v in kwargs.items(): + if not (isinstance(v, xr.DataArray) or v is None): + _, opts = statistics.min_max(data, dim) + helper = xr.ones_like(opts['min']) + kwargs[k] = helper * v + mean = kwargs.pop('mean', None) + std = kwargs.pop('std', None) + min = kwargs.pop('min', None) + max = kwargs.pop('max', None) + feature_range = kwargs.pop('feature_range', None) + + if method == "standardise": + return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + elif method == "centre": + return statistics.centre_apply(data, mean), {"mean": mean, "method": method} + elif method == "min_max": + return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method, + "feature_range": feature_range} + elif method == "log": + return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + else: + raise NotImplementedError + + opts = opts or {} + opts_updated = {} + if not inverse: + transformed_values = [] + for var in data_in.variables.values: + data_var = data_in.sel(**{transformation_dim: [var]}) + var_opts = opts.get(var, {}) + _apply = (var_opts.get("mean", None) is not None) or (var_opts.get("min") is not None) + values, new_var_opts = locals()["f_apply" if _apply else "f"](data_var, **var_opts) + opts_updated[var] = copy.deepcopy(new_var_opts) + transformed_values.append(values) + return xr.concat(transformed_values, dim=transformation_dim), opts_updated + else: + return self.inverse_transform(data_in, opts, transformation_dim) + class DataHandlerFirFilter(DataHandlerFilter): """Data handler using FIR filtered data.""" @@ -399,7 +492,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation # add unfiltered raw data if self._add_unfiltered is True: data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size) - data_raw = data_raw.expand_dims({self.filter_dim: ["unfiltered"]}, -1) + filter_dim = self.create_filter_index(add_unfiltered_index=True)[-1] + data_raw = data_raw.expand_dims({self.filter_dim: [filter_dim]}, -1) input_data = xr.concat([input_data, data_raw], self.filter_dim) self.input_data = input_data @@ -423,7 +517,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation index = list(map(f, index.tolist())) index = list(map(lambda x: str(x) + "d", index)) + ["res"] if self._add_unfiltered and add_unfiltered_index: - index.append("unfiltered") + index.append(self.unfiltered_name) self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index b71caa681b5e1823ce55fbb3ee7d7afb77df7dba..543cff3624577ac617733b8b593c5f52f25196b3 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -384,7 +384,7 @@ class ClimateFIRFilter: _end = pd.to_datetime(data.coords[time_dim].max().values).year filt_coll = [] for _year in range(_start, _end + 1): - logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") + logging.debug(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) d = data.sel({var_dim: [var], time_dim: time_slice}) diff --git a/run_climate_filter.py b/run_climate_filter.py index 4aacab8817b2f6350de861ef383b4777790bc57c..e6cae785f817c11cff2fe5316ff0a405807d3557 100755 --- a/run_climate_filter.py +++ b/run_climate_filter.py @@ -5,6 +5,8 @@ import argparse from mlair.workflows import DefaultWorkflow from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSamplingWithClimateFirFilter +from mlair.model_modules.fully_connected_networks import BranchedInputFCN as NN + stats = {'o3': 'dma8eu', 'no': 'dma8eu', 'no2': 'dma8eu', 'relhum': 'average_values', 'u': 'average_values', 'v': 'average_values', @@ -19,25 +21,45 @@ data_origin = {'o3': '', 'no': '', 'no2': '', def main(parser_args): args = dict(stations=["DEBW107", "DEBW013"], network="UBA", - evaluate_bootstraps=False, plot_list=[], + evaluate_bootstraps=False, plot_list=["PlotDataHistogram"], data_origin=data_origin, data_handler=DataHandlerMixedSamplingWithClimateFirFilter, interpolation_limit=(3, 1), overwrite_local_data=False, + overwrite_lazy_data=False, lazy_preprocessing=True, use_multiprocessing=True, - use_multiprocessing_on_debug=True, + use_multiprocessing_on_debug=False, sampling=("hourly", "daily"), statistics_per_var=stats, create_new_model=True, train_model=False, epochs=1, - window_history_size=6 * 24 + 16, + # window_history_size=6 * 24 + 16, + window_history_size=2 * 24 + 16, window_history_offset=16, kz_filter_length=[100 * 24, 15 * 24], kz_filter_iter=[4, 5], - filter_cutoff_period=[7, 0.8], - filter_order=[7, 2], + # filter_cutoff_period=[7, 0.8], + # filter_order=[7, 2], + # filter_cutoff_period=[21, 2.7, 11/24.], + # filter_order=[42, 7, 2], + filter_cutoff_period=[3], + filter_order=[7], + # extreme_values=[1.5, 1.75, 2, 2.25, 2.5, 3], + filter_add_unfiltered=True, start="2006-01-01", train_start="2006-01-01", end="2011-12-31", test_end="2011-12-31", + model=NN, + use_filter_branches=True, + transformation={ + "o3": {"method": "log"}, + "no": {"method": "log"}, + "no2": {"method": "log"}, + "cloudcover": {"method": "min_max"}, + "pblheight": {"method": "log"}, + "relhum": {"method": "min_max"}, + "temp": {"method": "standardise"}, + "u": {"method": "standardise"}, + "v": {"method": "standardise"}, }, # T1F, also apply same target transformation **parser_args.__dict__, ) workflow = DefaultWorkflow(**args, start_script=__file__)