diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 8205ae6c28f3683b1052c292e5d063d8bca555dc..ae2e6a1a303076c4da1e7b00ae6653336a633364 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -12,6 +12,7 @@ from mlair.helpers import remove_items from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD from mlair.helpers.filter import filter_width_kzf +import copy import inspect from typing import Callable import datetime as dt @@ -140,13 +141,6 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: start, end = self.update_start_end(ind) - # if ind == 0: # for inputs - # estimated_filter_width = self.estimate_filter_width() - # start = self._add_time_delta(self.start, -estimated_filter_width) - # end = self._add_time_delta(self.end, estimated_filter_width) - # else: # target - # start, end = self.start, self.end - vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) @@ -264,8 +258,83 @@ class DataHandlerMixedSamplingWithClimateFirFilter(DataHandlerClimateFirFilter): data_handler = DataHandlerMixedSamplingWithClimateFirFilterSingleStation data_handler_transformation = DataHandlerMixedSamplingWithClimateFirFilterSingleStation - _requirements = data_handler.requirements() + data_handler_unfiltered = DataHandlerMixedSamplingSingleStation + _requirements = list(set(data_handler.requirements() + data_handler_unfiltered.requirements())) + DEFAULT_FILTER_ADD_UNFILTERED = False + + def __init__(self, *args, data_handler_class_unfiltered: data_handler_unfiltered = None, + filter_add_unfiltered: bool = DEFAULT_FILTER_ADD_UNFILTERED, **kwargs): + self.dh_unfiltered = data_handler_class_unfiltered + self.filter_add_unfiltered = filter_add_unfiltered + super().__init__(*args, **kwargs) + @classmethod + def own_args(cls, *args): + """Return all arguments (including kwonlyargs).""" + super_own_args = DataHandlerClimateFirFilter.own_args(*args) + arg_spec = inspect.getfullargspec(cls) + list_of_args = arg_spec.args + arg_spec.kwonlyargs + super_own_args + return remove_items(list_of_args, ["self"] + list(args)) + + def _create_collection(self): + if self.filter_add_unfiltered is True and self.dh_unfiltered is not None: + return [self.id_class, self.dh_unfiltered] + else: + return super()._create_collection() + + @classmethod + def build(cls, station: str, **kwargs): + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler.requirements() if k in kwargs} + filter_add_unfiltered = kwargs.get("filter_add_unfiltered", False) + sp_keys = cls.build_update_kwargs(sp_keys, dh_type="filtered") + sp = cls.data_handler(station, **sp_keys) + if filter_add_unfiltered is True: + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} + sp_keys = cls.build_update_kwargs(sp_keys, dh_type="unfiltered") + sp_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) + else: + sp_unfiltered = None + dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs} + return cls(sp, data_handler_class_unfiltered=sp_unfiltered, **dp_args) + + @classmethod + def build_update_kwargs(cls, kwargs_dict, dh_type="filtered"): + if "transformation" in kwargs_dict: + trafo_opts = kwargs_dict.get("transformation") + if isinstance(trafo_opts, dict): + kwargs_dict["transformation"] = trafo_opts.get(dh_type) + return kwargs_dict + + @classmethod + def transformation(cls, set_stations, tmp_path=None, **kwargs): + + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} + if "transformation" not in sp_keys.keys(): + return + + transformation_filtered = super().transformation(set_stations, tmp_path=tmp_path, + dh_transformation=cls.data_handler_transformation, **kwargs) + if kwargs.get("filter_add_unfiltered", False) is False: + return transformation_filtered + else: + transformation_unfiltered = super().transformation(set_stations, tmp_path=tmp_path, + dh_transformation=cls.data_handler_unfiltered, **kwargs) + return {"filtered": transformation_filtered, "unfiltered": transformation_unfiltered} + + def get_X_original(self): + if self.use_filter_branches is True: + X = [] + for data in self._collection: + if hasattr(data, "filter_dim"): + X_total = data.get_X() + filter_dim = data.filter_dim + for filter_name in data.filter_dim_order: + X.append(X_total.sel({filter_dim: filter_name}, drop=True)) + else: + X.append(data.get_X()) + return X + else: + return super().get_X_original() class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithKzFilterSingleStation): """ diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 054713481478826af2c5220f2b9d9e9c08c4a0c2..8e95e76365181ee76f91a91319b912f2626a223a 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -223,7 +223,8 @@ class DataHandlerSingleStation(AbstractDataHandler): elif method == "centre": return statistics.centre_apply(data, mean), {"mean": mean, "method": method} elif method == "min_max": - return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method, + kws = {"feature_range": feature_range} if feature_range is not None else {} + return statistics.min_max_apply(data, min, max, **kws), {"min": min, "max": max, "method": method, "feature_range": feature_range} elif method == "log": return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method} @@ -416,8 +417,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - # used_chem_vars = list(set(chem_vars) & set(self.statistics_per_var.keys())) - used_chem_vars = list(set(chem_vars) & set(data.variables.values)) + used_chem_vars = list(set(chem_vars) & set(data.coords[self.target_dim].values)) if len(used_chem_vars) > 0: data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) return data @@ -463,11 +463,8 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: this array """ ind = pd.DataFrame({'val': index_value}, index=index_value) - # res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim=squeez/e_dim, drop=True) res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze( - dim=squeeze_dim, - drop=True - ) + dim=squeeze_dim, drop=True) res.name = index_name return res @@ -750,8 +747,6 @@ class DataHandlerSingleStation(AbstractDataHandler): if __name__ == "__main__": - # dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - # print(dp) statistics_per_var = {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'} sp = DataHandlerSingleStation(data_path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122', statistics_per_var=statistics_per_var, station_type='background', diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 785eb7dffff28a676342feace519a6db0871c1df..4707fd580562a68fd6b2dc0843551905e70d7e50 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -4,6 +4,7 @@ __author__ = 'Lukas Leufen' __date__ = '2020-08-26' import inspect +import copy import numpy as np import pandas as pd import xarray as xr @@ -12,7 +13,7 @@ from functools import partial import logging from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.data_handler import DefaultDataHandler -from mlair.helpers import remove_items, to_list, TimeTrackingWrapper +from mlair.helpers import remove_items, to_list, TimeTrackingWrapper, statistics from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf @@ -126,32 +127,16 @@ class DataHandlerFilter(DefaultDataHandler): list_of_args = arg_spec.args + arg_spec.kwonlyargs + super_own_args return remove_items(list_of_args, ["self"] + list(args)) - def get_X_original(self): - if self.use_filter_branches is True: - X = [] - for data in self._collection: - X_total = data.get_X() - filter_dim = data.filter_dim - for filter_name in data.filter_dim_order: - X.append(X_total.sel({filter_dim: filter_name}, drop=True)) - return X - else: - return super().get_X_original() - class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): """Data handler for a single station to be used by a superior data handler. Inputs are FIR filtered.""" _requirements = remove_items(DataHandlerFilterSingleStation.requirements(), "station") - _hash = DataHandlerFilterSingleStation._hash + ["filter_cutoff_period", "filter_order", "filter_window_type", - "_add_unfiltered"] + _hash = DataHandlerFilterSingleStation._hash + ["filter_cutoff_period", "filter_order", "filter_window_type"] DEFAULT_WINDOW_TYPE = ("kaiser", 5) - DEFAULT_ADD_UNFILTERED = False - def __init__(self, *args, filter_cutoff_period, filter_order, filter_window_type=DEFAULT_WINDOW_TYPE, - filter_add_unfiltered=DEFAULT_ADD_UNFILTERED, **kwargs): - # self._check_sampling(**kwargs) + def __init__(self, *args, filter_cutoff_period, filter_order, filter_window_type=DEFAULT_WINDOW_TYPE, **kwargs): # self.original_data = None # ToDo: implement here something to store unfiltered data self.fs = self._get_fs(**kwargs) if filter_window_type == "kzf": @@ -161,7 +146,7 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): assert len(self.filter_cutoff_period) == (len(filter_order) - len(removed_index)) self.filter_order = self._prepare_filter_order(filter_order, removed_index, self.fs) self.filter_window_type = filter_window_type - self._add_unfiltered = filter_add_unfiltered + self.unfiltered_name = "unfiltered" super().__init__(*args, **kwargs) @staticmethod @@ -223,8 +208,6 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): self.filter_window_type, self.target_dim) self.fir_coeff = fir.filter_coefficients() fir_data = fir.filtered_data() - if self._add_unfiltered is True: - fir_data.append(self.input_data) self.input_data = xr.concat(fir_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) # this is just a code snippet to check the results of the kz filter # import matplotlib @@ -249,8 +232,6 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): else: index.append(f"band{band_num}") band_num += 1 - if self._add_unfiltered: - index.append("unfiltered") self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) @@ -261,13 +242,89 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): _data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data super(__class__, self)._extract_lazy((_data, _meta, _input_data, _target_data)) + def transform(self, data_in, dim: Union[str, int] = 0, inverse: bool = False, opts=None, + transformation_dim=None): + """ + Transform data according to given transformation settings. + + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 + and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale + (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This + method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the + internal transform method, internal mean and internal standard deviation weren't set ('inverse=True'). + + :param string/int dim: This param is not used for inverse transformation. + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :param inverse: Switch between transformation and inverse transformation. + + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + + if transformation_dim is None: + transformation_dim = self.DEFAULT_TARGET_DIM + + def f(data, method="standardise", feature_range=None): + if method == "standardise": + return statistics.standardise(data, dim) + elif method == "centre": + return statistics.centre(data, dim) + elif method == "min_max": + kwargs = {"feature_range": feature_range} if feature_range is not None else {} + return statistics.min_max(data, dim, **kwargs) + elif method == "log": + return statistics.log(data, dim) + else: + raise NotImplementedError + + def f_apply(data, method, **kwargs): + for k, v in kwargs.items(): + if not (isinstance(v, xr.DataArray) or v is None): + _, opts = statistics.min_max(data, dim) + helper = xr.ones_like(opts['min']) + kwargs[k] = helper * v + mean = kwargs.pop('mean', None) + std = kwargs.pop('std', None) + min = kwargs.pop('min', None) + max = kwargs.pop('max', None) + feature_range = kwargs.pop('feature_range', None) + + if method == "standardise": + return statistics.standardise_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + elif method == "centre": + return statistics.centre_apply(data, mean), {"mean": mean, "method": method} + elif method == "min_max": + return statistics.min_max_apply(data, min, max), {"min": min, "max": max, "method": method, + "feature_range": feature_range} + elif method == "log": + return statistics.log_apply(data, mean, std), {"mean": mean, "std": std, "method": method} + else: + raise NotImplementedError + + opts = opts or {} + opts_updated = {} + if not inverse: + transformed_values = [] + for var in data_in.variables.values: + data_var = data_in.sel(**{transformation_dim: [var]}) + var_opts = opts.get(var, {}) + _apply = (var_opts.get("mean", None) is not None) or (var_opts.get("min") is not None) + values, new_var_opts = locals()["f_apply" if _apply else "f"](data_var, **var_opts) + opts_updated[var] = copy.deepcopy(new_var_opts) + transformed_values.append(values) + return xr.concat(transformed_values, dim=transformation_dim), opts_updated + else: + return self.inverse_transform(data_in, opts, transformation_dim) + class DataHandlerFirFilter(DataHandlerFilter): """Data handler using FIR filtered data.""" data_handler = DataHandlerFirFilterSingleStation data_handler_transformation = DataHandlerFirFilterSingleStation - _requirements = data_handler.requirements() class DataHandlerKzFilterSingleStation(DataHandlerFilterSingleStation): @@ -396,12 +453,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(add_unfiltered_index=False), name=self.filter_dim)) - # add unfiltered raw data - if self._add_unfiltered is True: - data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size) - data_raw = data_raw.expand_dims({self.filter_dim: ["unfiltered"]}, -1) - input_data = xr.concat([input_data, data_raw], self.filter_dim) - self.input_data = input_data # this is just a code snippet to check the results of the filter @@ -422,8 +473,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1) index = list(map(f, index.tolist())) index = list(map(lambda x: str(x) + "d", index)) + ["res"] - if self._add_unfiltered and add_unfiltered_index: - index.append("unfiltered") self.filter_dim_order = index return pd.Index(index, name=self.filter_dim) @@ -492,11 +541,3 @@ class DataHandlerClimateFirFilter(DataHandlerFilter): _requirements = data_handler.requirements() _store_attributes = data_handler.store_attributes() - # def get_X_original(self): - # X = [] - # for data in self._collection: - # X_total = data.get_X() - # filter_dim = data.filter_dim - # for filter in data.filter_dim_order: - # X.append(X_total.sel({filter_dim: filter}, drop=True)) - # return X diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 8ad3e1e7ff583bd511d6311f2ab9de886f440fc9..a02ad89c910b8d898778a748db0e68b3f75fd5f1 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -53,6 +53,7 @@ class DefaultDataHandler(AbstractDataHandler): self._Y = None self._X_extreme = None self._Y_extreme = None + self._data_intersection = None self._use_multiprocessing = use_multiprocessing self._max_number_multiprocessing = max_number_multiprocessing _name_affix = str(f"{str(self.id_class)}_{name_affix}" if name_affix is not None else id(self)) @@ -172,10 +173,15 @@ class DefaultDataHandler(AbstractDataHandler): else: X = list(map(lambda x: x.sel({dim: intersect}), X_original)) Y = Y_original.sel({dim: intersect}) + self._data_intersection = intersect self._X, self._Y = X, Y def get_observation(self): - return self.id_class.observation.copy().squeeze() + dim = self.time_dim + if self._data_intersection is not None: + return self.id_class.observation.sel({dim: self._data_intersection}).copy().squeeze() + else: + return self.id_class.observation.copy().squeeze() def apply_transformation(self, data, base="target", dim=0, inverse=False): return self.id_class.apply_transformation(data, dim=dim, base=base, inverse=inverse) @@ -248,7 +254,7 @@ class DefaultDataHandler(AbstractDataHandler): d.coords[dim] = d.coords[dim].values + np.timedelta64(*timedelta) @classmethod - def transformation(cls, set_stations, tmp_path=None, **kwargs): + def transformation(cls, set_stations, tmp_path=None, dh_transformation=None, **kwargs): """ ### supported transformation methods @@ -278,31 +284,14 @@ class DefaultDataHandler(AbstractDataHandler): If min and max are not None, the default data handler expects this parameters to match the data and applies this values to the data. Make sure that all dimensions and/or coordinates are in agreement. """ + if dh_transformation is None: + dh_transformation = cls.data_handler_transformation - sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in dh_transformation.requirements() if k in kwargs} if "transformation" not in sp_keys.keys(): return transformation_dict = ({}, {}) - def _inner(): - """Inner method that is performed in both serial and parallel approach.""" - if dh is not None: - for i, transformation in enumerate(dh._transformation): - for var in transformation.keys(): - if var not in transformation_dict[i].keys(): - transformation_dict[i][var] = {} - opts = transformation[var] - if not transformation_dict[i][var].get("method", opts["method"]) == opts["method"]: - # data handlers with filters are allowed to change transformation method to standardise - assert hasattr(dh, "filter_dim") and opts["method"] == "standardise" - transformation_dict[i][var]["method"] = opts["method"] - for k in ["mean", "std", "min", "max"]: - old = transformation_dict[i][var].get(k, None) - new = opts.get(k) - transformation_dict[i][var][k] = new if old is None else old.combine_first(new) - if "feature_range" in opts.keys(): - transformation_dict[i][var]["feature_range"] = opts.get("feature_range", None) - max_process = kwargs.get("max_number_multiprocessing", 16) n_process = min([psutil.cpu_count(logical=False), len(set_stations), max_process]) # use only physical cpus if n_process > 1 and kwargs.get("use_multiprocessing", True) is True: # parallel solution @@ -311,24 +300,29 @@ class DefaultDataHandler(AbstractDataHandler): logging.info(f"running {getattr(pool, '_processes')} processes in parallel") sp_keys.update({"tmp_path": tmp_path, "return_strategy": "reference"}) output = [ - pool.apply_async(f_proc, args=(cls.data_handler_transformation, station), kwds=sp_keys) + pool.apply_async(f_proc, args=(dh_transformation, station), kwds=sp_keys) for station in set_stations] for p in output: _res_file, s = p.get() with open(_res_file, "rb") as f: dh = dill.load(f) os.remove(_res_file) - _inner() + transformation_dict = cls.update_transformation_dict(dh, transformation_dict) pool.close() else: # serial solution logging.info("use serial transformation approach") sp_keys.update({"return_strategy": "result"}) for station in set_stations: - dh, s = f_proc(cls.data_handler_transformation, station, **sp_keys) - _inner() + dh, s = f_proc(dh_transformation, station, **sp_keys) + transformation_dict = cls.update_transformation_dict(dh, transformation_dict) # aggregate all information iter_dim = sp_keys.get("iter_dim", cls.DEFAULT_ITER_DIM) + transformation_dict = cls.aggregate_transformation(transformation_dict, iter_dim) + return transformation_dict + + @classmethod + def aggregate_transformation(cls, transformation_dict, iter_dim): pop_list = [] for i, transformation in enumerate(transformation_dict): for k in transformation.keys(): @@ -349,6 +343,27 @@ class DefaultDataHandler(AbstractDataHandler): transformation_dict[i].pop(k) return transformation_dict + @classmethod + def update_transformation_dict(cls, dh, transformation_dict): + """Inner method that is performed in both serial and parallel approach.""" + if dh is not None: + for i, transformation in enumerate(dh._transformation): + for var in transformation.keys(): + if var not in transformation_dict[i].keys(): + transformation_dict[i][var] = {} + opts = transformation[var] + if not transformation_dict[i][var].get("method", opts["method"]) == opts["method"]: + # data handlers with filters are allowed to change transformation method to standardise + assert hasattr(dh, "filter_dim") and opts["method"] == "standardise" + transformation_dict[i][var]["method"] = opts["method"] + for k in ["mean", "std", "min", "max"]: + old = transformation_dict[i][var].get(k, None) + new = opts.get(k) + transformation_dict[i][var][k] = new if old is None else old.combine_first(new) + if "feature_range" in opts.keys(): + transformation_dict[i][var]["feature_range"] = opts.get("feature_range", None) + return transformation_dict + def get_coordinates(self): return self.id_class.get_coordinates() diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index b71caa681b5e1823ce55fbb3ee7d7afb77df7dba..543cff3624577ac617733b8b593c5f52f25196b3 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -384,7 +384,7 @@ class ClimateFIRFilter: _end = pd.to_datetime(data.coords[time_dim].max().values).year filt_coll = [] for _year in range(_start, _end + 1): - logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") + logging.debug(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) d = data.sel({var_dim: [var], time_dim: time_slice}) diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index fef52fb27d602b5931587ff0fa2d8edd7e0c2d8f..87f780f9a6133edfcb2f9c71c2956b92f332e915 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -152,7 +152,10 @@ def min_max_apply(data: Data, _min: Data, _max: Data, feature_range: Data = (0, :param feature_range: scale data to any interval given in feature range. Default is scaling on interval [0, 1]. :return: min/max scaled data """ - return (data - _min) / (_max - _min) * (max(feature_range) - min(feature_range)) + min(feature_range) + if not isinstance(feature_range, xr.DataArray): + return (data - _min) / (_max - _min) * (max(feature_range) - min(feature_range)) + min(feature_range) + else: + return (data - _min) / (_max - _min) * (feature_range.max() - feature_range.min()) + feature_range.min() def log(data: Data, dim: Union[str, int]) -> Tuple[Data, Dict[(str, Data)]]: diff --git a/run_climate_filter.py b/run_climate_filter.py index 4aacab8817b2f6350de861ef383b4777790bc57c..e6cae785f817c11cff2fe5316ff0a405807d3557 100755 --- a/run_climate_filter.py +++ b/run_climate_filter.py @@ -5,6 +5,8 @@ import argparse from mlair.workflows import DefaultWorkflow from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSamplingWithClimateFirFilter +from mlair.model_modules.fully_connected_networks import BranchedInputFCN as NN + stats = {'o3': 'dma8eu', 'no': 'dma8eu', 'no2': 'dma8eu', 'relhum': 'average_values', 'u': 'average_values', 'v': 'average_values', @@ -19,25 +21,45 @@ data_origin = {'o3': '', 'no': '', 'no2': '', def main(parser_args): args = dict(stations=["DEBW107", "DEBW013"], network="UBA", - evaluate_bootstraps=False, plot_list=[], + evaluate_bootstraps=False, plot_list=["PlotDataHistogram"], data_origin=data_origin, data_handler=DataHandlerMixedSamplingWithClimateFirFilter, interpolation_limit=(3, 1), overwrite_local_data=False, + overwrite_lazy_data=False, lazy_preprocessing=True, use_multiprocessing=True, - use_multiprocessing_on_debug=True, + use_multiprocessing_on_debug=False, sampling=("hourly", "daily"), statistics_per_var=stats, create_new_model=True, train_model=False, epochs=1, - window_history_size=6 * 24 + 16, + # window_history_size=6 * 24 + 16, + window_history_size=2 * 24 + 16, window_history_offset=16, kz_filter_length=[100 * 24, 15 * 24], kz_filter_iter=[4, 5], - filter_cutoff_period=[7, 0.8], - filter_order=[7, 2], + # filter_cutoff_period=[7, 0.8], + # filter_order=[7, 2], + # filter_cutoff_period=[21, 2.7, 11/24.], + # filter_order=[42, 7, 2], + filter_cutoff_period=[3], + filter_order=[7], + # extreme_values=[1.5, 1.75, 2, 2.25, 2.5, 3], + filter_add_unfiltered=True, start="2006-01-01", train_start="2006-01-01", end="2011-12-31", test_end="2011-12-31", + model=NN, + use_filter_branches=True, + transformation={ + "o3": {"method": "log"}, + "no": {"method": "log"}, + "no2": {"method": "log"}, + "cloudcover": {"method": "min_max"}, + "pblheight": {"method": "log"}, + "relhum": {"method": "min_max"}, + "temp": {"method": "standardise"}, + "u": {"method": "standardise"}, + "v": {"method": "standardise"}, }, # T1F, also apply same target transformation **parser_args.__dict__, ) workflow = DefaultWorkflow(**args, start_script=__file__)