diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..943f3af390cba7ade4a77fe5aaf8e87a984abff3 --- /dev/null +++ b/mlair/data_handler/data_handler_kz_filter.py @@ -0,0 +1,91 @@ +"""Data Handler using kz-filtered data.""" + +__author__ = 'Lukas Leufen' +__date__ = '2020-08-26' + +import inspect +import numpy as np +import pandas as pd +import xarray as xr +from typing import List + +from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation +from mlair.data_handler import DefaultDataHandler +from mlair.helpers import remove_items, to_list, TimeTrackingWrapper +from mlair.helpers.statistics import KolmogorovZurbenkoFilterMovingWindow as KZFilter + + +class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): + """Data handler for a single station to be used by a superior data handler. Data is kz filtered.""" + + _requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"]) + + def __init__(self, *args, kz_filter_length, kz_filter_iter, **kwargs): + assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution + kz_filter_length = to_list(kz_filter_length) + kz_filter_iter = to_list(kz_filter_iter) + # self.original_data = None # ToDo: implement here something to store unfiltered data + self.kz_filter_length = kz_filter_length + self.kz_filter_iter = kz_filter_iter + self.cutoff_period = None + self.cutoff_period_days = None + super().__init__(*args, **kwargs) + + def setup_samples(self): + """ + Setup samples. This method prepares and creates samples X, and labels Y. + """ + self.load_data() + self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) + import matplotlib + matplotlib.use("TkAgg") + import matplotlib.pyplot as plt + # self.original_data = self.data # ToDo: implement here something to store unfiltered data + self.apply_kz_filter() + self.data.sel(filter="74d", variables="temp", Stations="DEBW107").plot() + if self.transformation is not None: + self.call_transform() + self.make_samples() # ToDo: target samples are still comming from filtered data + + @TimeTrackingWrapper + def apply_kz_filter(self): + kz = KZFilter(self.data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim="datetime") + filtered_data: List[xr.DataArray] = kz.run() + self.cutoff_period = kz.period_null() + self.cutoff_period_days = kz.period_null_days() + self.data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name="filter")) + + def create_filter_index(self) -> pd.Index: + """ + Round cut off periods in days and append 'res' for residuum index. + + Round small numbers (<10) to single decimal, and higher numbers to int. Transform as list of str and append + 'res' for residuum index. + """ + index = np.round(self.cutoff_period_days, 1) + f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1) + index = list(map(f, index.tolist())) + index = list(map(lambda x: str(x) + "d", index)) + ["res"] + return pd.Index(index, name="filter") + + def get_transposed_history(self) -> xr.DataArray: + """Return history. + + :return: history with dimensions datetime, window, Stations, variables. + """ + return self.history.transpose("datetime", "window", "Stations", "variables", "filter").copy() + + def get_transposed_label(self) -> xr.DataArray: + """Return label. + + :return: label with dimensions datetime*, window*, Stations, variables. + """ + return self.label.squeeze("Stations").transpose("datetime", "window", "filter").copy() + + +class DataHandlerKzFilter(DefaultDataHandler): + """Data handler using kz filtered data.""" + + data_handler = DataHandlerKzFilterSingleStation + data_handler_transformation = DataHandlerKzFilterSingleStation + _requirements = data_handler.requirements() diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index f6206e98516a053c2f81f45d9d74b2939bd36e3c..8faf9fd38006b595c38c35c4f46be1cc8ffd3626 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -357,7 +357,7 @@ class KolmogorovZurbenkoBaseClass: It create the variables associate with the Kolmogorov-Zurbenko-filter. Args: - df(pd.DataFrame): time series of a variable + df(pd.DataFrame, None): time series of a variable wl(list of int): window length itr(list of int): number of iteration """ @@ -373,27 +373,25 @@ class KolmogorovZurbenkoBaseClass: def set_child(self): if len(self.wl) > 1: - return KolmogorovZurbenkoBaseClass(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim) + return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim) else: return None - def kz_filter(self, m, k): + def kz_filter(self, df, m, k): pass def spectral_calc(self): - kz = self.kz_filter(self.wl[0], self.itr[0]) + df_start = self.df + kz = self.kz_filter(df_start, self.wl[0], self.itr[0]) + filtered = self.subtract(df_start, kz) + # case I: no child avail -> return kz and remaining if self.child is None: - if not self._isChild: - return [self.subtract(self.df, kz), kz] - else: - return [kz, kz] + return [kz, filtered] + # case II: has child -> return current kz and all child results else: - if not self._isChild: - kz_next = self.child.spectral_calc() - return [self.subtract(self.df, kz), self.subtract(kz, kz_next[0])] + kz_next[1:] - else: - kz_next = self.child.spectral_calc() - return [kz, self.subtract(kz, kz_next[0])] + kz_next[1:] + self.child.df = filtered + kz_next = self.child.spectral_calc() + return [kz] + kz_next @staticmethod def subtract(minuend, subtrahend): @@ -413,9 +411,9 @@ class KolmogorovZurbenkoBaseClass: def omega_null(self, alpha=0.5): a = np.sqrt(6) / np.pi - b = 1 / (2 * self.itr[0]) + b = 1 / (2 * np.array(self.itr)) c = 1 - alpha ** b - d = self.wl[0] ** 2 - alpha ** b + d = np.array(self.wl) ** 2 - alpha ** b return a * np.sqrt(c / d) def period_null(self, alpha=0.5): @@ -482,7 +480,7 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): else: return None - def kz_filter(self, wl, itr): + def kz_filter(self, df, wl, itr): """ It passes the low frequency time series. @@ -490,24 +488,28 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): wl(int): a window length itr(int): a number of iteration """ - df_itr = self.df.__deepcopy__() + df_itr = df.__deepcopy__() try: kwargs = {"min_periods": 1, "center": True, self.filter_dim: wl} - for _ in np.arange(0, itr): - rolling = df_itr.rolling(**kwargs) - if self.method == "median": - df_mv_avg_tmp = rolling.median() - elif self.method == "percentile": - df_mv_avg_tmp = rolling.quantile(self.percentile) - elif self.method == "max": - df_mv_avg_tmp = rolling.max() - elif self.method == "min": - df_mv_avg_tmp = rolling.min() - else: - df_mv_avg_tmp = rolling.mean() - df_itr = df_mv_avg_tmp + iter_vars = df_itr.coords["variables"].values + for var in iter_vars: + df_itr_var = df_itr.sel(variables=[var]).chunk() + for _ in np.arange(0, itr): + rolling = df_itr_var.rolling(**kwargs) + if self.method == "median": + df_mv_avg_tmp = rolling.median() + elif self.method == "percentile": + df_mv_avg_tmp = rolling.quantile(self.percentile) + elif self.method == "max": + df_mv_avg_tmp = rolling.max() + elif self.method == "min": + df_mv_avg_tmp = rolling.min() + else: + df_mv_avg_tmp = rolling.mean() + df_itr_var = df_mv_avg_tmp.compute() + df_itr = df_itr.drop_sel(variables=var).combine_first(df_itr_var) return df_itr except ValueError: raise ValueError diff --git a/requirements.txt b/requirements.txt index be76eab5b74797b039682a292ae8890488c058ec..371bb776e581925e507bf06c60bd866061c52791 100644 --- a/requirements.txt +++ b/requirements.txt @@ -61,7 +61,7 @@ typing-extensions urllib3==1.25.8 wcwidth==0.1.8 Werkzeug==1.0.0 -xarray==0.15.0 +xarray==0.16.1 zipp==3.1.0 setuptools~=49.6.0