diff --git a/mlair/data_handler/data_handler_kz_filter.py b/mlair/data_handler/data_handler_kz_filter.py deleted file mode 100644 index 539712b39e51c32203e1c55e28ce2eff24069479..0000000000000000000000000000000000000000 --- a/mlair/data_handler/data_handler_kz_filter.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Data Handler using kz-filtered data.""" - -__author__ = 'Lukas Leufen' -__date__ = '2020-08-26' - -import inspect -import numpy as np -import pandas as pd -import xarray as xr -from typing import List, Union, Tuple, Optional -from functools import partial - -from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation -from mlair.data_handler import DefaultDataHandler -from mlair.helpers import remove_items, to_list, TimeTrackingWrapper -from mlair.helpers.statistics import KolmogorovZurbenkoFilterMovingWindow as KZFilter - -# define a more general date type for type hinting -str_or_list = Union[str, List[str]] - - -class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): - """Data handler for a single station to be used by a superior data handler. Inputs are kz filtered.""" - - _requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"]) - _hash = DataHandlerSingleStation._hash + ["kz_filter_length", "kz_filter_iter", "filter_dim"] - - DEFAULT_FILTER_DIM = "filter" - - def __init__(self, *args, kz_filter_length, kz_filter_iter, filter_dim=DEFAULT_FILTER_DIM, **kwargs): - self._check_sampling(**kwargs) - # self.original_data = None # ToDo: implement here something to store unfiltered data - self.kz_filter_length = to_list(kz_filter_length) - self.kz_filter_iter = to_list(kz_filter_iter) - self.filter_dim = filter_dim - self.cutoff_period = None - self.cutoff_period_days = None - super().__init__(*args, **kwargs) - - def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: - """ - Adjust setup of transformation because kfz filtered data will have negative values which is not compatible with - the log transformation. Therefore, replace all log transformation methods by a default standardization. This is - only applied on input side. - """ - transformation = super(__class__, self).setup_transformation(transformation) - if transformation[0] is not None: - for k, v in transformation[0].items(): - if v["method"] == "log": - transformation[0][k]["method"] = "standardise" - return transformation - - def _check_sampling(self, **kwargs): - assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution - - def make_input_target(self): - data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin) - self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, - limit=self.interpolation_limit) - self.set_inputs_and_targets() - self.apply_kz_filter() - # this is just a code snippet to check the results of the kz filter - # import matplotlib - # matplotlib.use("TkAgg") - # import matplotlib.pyplot as plt - # self.input_data.sel(filter="74d", variables="temp", Stations="DEBW107").plot() - # self.input_data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter") - - @TimeTrackingWrapper - def apply_kz_filter(self): - """Apply kolmogorov zurbenko filter only on inputs.""" - kz = KZFilter(self.input_data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim=self.time_dim) - filtered_data: List[xr.DataArray] = kz.run() - self.cutoff_period = kz.period_null() - self.cutoff_period_days = kz.period_null_days() - self.input_data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) - - def create_filter_index(self) -> pd.Index: - """ - Round cut off periods in days and append 'res' for residuum index. - - Round small numbers (<10) to single decimal, and higher numbers to int. Transform as list of str and append - 'res' for residuum index. - """ - index = np.round(self.cutoff_period_days, 1) - f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1) - index = list(map(f, index.tolist())) - index = list(map(lambda x: str(x) + "d", index)) + ["res"] - return pd.Index(index, name=self.filter_dim) - - def get_transposed_history(self) -> xr.DataArray: - """Return history. - - :return: history with dimensions datetime, window, Stations, variables, filter. - """ - return self.history.transpose(self.time_dim, self.window_dim, self.iter_dim, self.target_dim, - self.filter_dim).copy() - - def _create_lazy_data(self): - return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] - - def _extract_lazy(self, lazy_data): - _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data - f_prep = partial(self._slice_prep, start=self.start, end=self.end) - self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) - - -class DataHandlerKzFilter(DefaultDataHandler): - """Data handler using kz filtered data.""" - - data_handler = DataHandlerKzFilterSingleStation - data_handler_transformation = DataHandlerKzFilterSingleStation - _requirements = data_handler.requirements() diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index a10364333f3671448c560b40283fb2645d251428..4c84866b4342d5ae75e9cded3dc3cab969ffdac5 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -2,7 +2,7 @@ __author__ = 'Lukas Leufen' __date__ = '2020-11-05' from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation -from mlair.data_handler.data_handler_kz_filter import DataHandlerKzFilterSingleStation +from mlair.data_handler.data_handler_with_filter import DataHandlerKzFilterSingleStation from mlair.data_handler import DefaultDataHandler from mlair import helpers from mlair.helpers import remove_items diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..6a37a44713a963f5cc914aa1754c0cde24071be5 --- /dev/null +++ b/mlair/data_handler/data_handler_with_filter.py @@ -0,0 +1,258 @@ +"""Data Handler using kz-filtered data.""" + +__author__ = 'Lukas Leufen' +__date__ = '2020-08-26' + +import inspect +import numpy as np +import pandas as pd +import xarray as xr +from typing import List, Union, Tuple, Optional +from functools import partial + +from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation +from mlair.data_handler import DefaultDataHandler +from mlair.helpers import remove_items, to_list, TimeTrackingWrapper +from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter +from mlair.helpers.filter import FIRFilter + +# define a more general date type for type hinting +str_or_list = Union[str, List[str]] + + +# cutoff_p = [(None, 14), (8, 6), (2, 0.8), (0.8, None)] +# cutoff = list(map(lambda x: (1. / x[0] if x[0] is not None else None, 1. / x[1] if x[1] is not None else None), cutoff_p)) +# fs = 24. +# # order = int(60 * fs) + 1 +# order = np.array([int(14 * fs) + 1, int(14 * fs) + 1, int(4 * fs) + 1, int(2 * fs) + 1]) +# print("cutoff period", cutoff_p) +# print("cutoff", cutoff) +# print("fs", fs) +# print("order", order) +# print("delay", 0.5 * (order-1) / fs) +# window = ("kaiser", 5) +# # low pass +# y, h = fir_filter(station_data.values.flatten(), fs, order[0], cutoff_low = cutoff[0][0], cutoff_high = cutoff[0][1], window=window) +# filtered = xr.ones_like(station_data) * y.reshape(station_data.values.shape) + +class DataHandlerFirFilterSingleStation(DataHandlerSingleStation): + """Data handler for a single station to be used by a superior data handler. Inputs are FIR filtered.""" + + _requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"]) + _hash = DataHandlerSingleStation._hash + ["filter_cutoff_period", "filter_order", "filter_window_type", + "filter_dim", "filter_add_unfiltered"] + + DEFAULT_FILTER_DIM = "filter" + DEFAULT_WINDOW_TYPE = ("kaiser", 5) + DEFAULT_ADD_UNFILTERED = False + + def __init__(self, *args, filter_cutoff_period, filter_order, filter_window_type=DEFAULT_WINDOW_TYPE, + filter_dim=DEFAULT_FILTER_DIM, filter_add_unfiltered=DEFAULT_ADD_UNFILTERED, **kwargs): + # self._check_sampling(**kwargs) + # self.original_data = None # ToDo: implement here something to store unfiltered data + self.filter_cutoff_period = (lambda x: [x] if isinstance(x, tuple) else to_list(x))(filter_cutoff_period) + self.filter_cutoff_freq = self._period_to_freq(self.filter_cutoff_period) + assert len(self.filter_cutoff_period) == len(filter_order) + self.filter_order = filter_order + self.filter_window_type = filter_window_type + self.filter_dim = filter_dim + self._add_unfiltered = filter_add_unfiltered + self.fs = self._get_fs(**kwargs) + + super().__init__(*args, **kwargs) + + @staticmethod + def _period_to_freq(cutoff_p): + return list(map(lambda x: (1. / x[0] if x[0] is not None else None, 1. / x[1] if x[1] is not None else None), + cutoff_p)) + + def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: + """ + Adjust setup of transformation because kfz filtered data will have negative values which is not compatible with + the log transformation. Therefore, replace all log transformation methods by a default standardization. This is + only applied on input side. + """ + transformation = super(__class__, self).setup_transformation(transformation) + if transformation[0] is not None: + for k, v in transformation[0].items(): + if v["method"] == "log": + transformation[0][k]["method"] = "standardise" + return transformation + + @staticmethod + def _get_fs(**kwargs): + """Return frequency in 1/day (not Hz)""" + sampling = kwargs.get("sampling") + if sampling == "daily": + return 1 + elif sampling == "hourly": + return 24 + else: + raise ValueError(f"Unknown sampling rate {sampling}. Only daily and hourly resolution is supported.") + + def _check_sampling(self, **kwargs): + assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution, does it? + + def make_input_target(self): + data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, + self.station_type, self.network, self.store_data_locally, self.data_origin) + self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, + limit=self.interpolation_limit) + self.set_inputs_and_targets() + self.apply_fir_filter() + # this is just a code snippet to check the results of the kz filter + # import matplotlib + # matplotlib.use("TkAgg") + # import matplotlib.pyplot as plt + # self.input_data.sel(filter="low", variables="temp", Stations="DEBW107").plot() + # self.input_data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter") + + @TimeTrackingWrapper + def apply_fir_filter(self): + """Apply FIR filter only on inputs.""" + fir = FIRFilter(self.input_data, self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, + self.target_dim) + self.fir_coeff = fir.filter_coefficients() + fir_data = fir.filtered_data() + if self._add_unfiltered is True: + fir_data.append(self.input_data) + self.input_data = xr.concat(fir_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) + + def create_filter_index(self) -> pd.Index: + """ + Create name for filter dimension. Use 'high' or 'low' for high/low pass data and 'bandi' for band pass data with + increasing numerator i (starting from 1). If 1 low, 2 band, and 1 high pass filter is used the filter index will + become to ['low', 'band1', 'band2', 'high']. + """ + index = [] + band_num = 1 + for (low, high) in self.filter_cutoff_period: + if low is None: + index.append("low") + elif high is None: + index.append("high") + else: + index.append(f"band{band_num}") + band_num += 1 + if self._add_unfiltered: + index.append("unfiltered") + return pd.Index(index, name=self.filter_dim) + + def get_transposed_history(self) -> xr.DataArray: + """Return history. + + :return: history with dimensions datetime, window, Stations, variables, filter. + """ + return self.history.transpose(self.time_dim, self.window_dim, self.iter_dim, self.target_dim, + self.filter_dim).copy() + + def _create_lazy_data(self): + return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] + + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data + f_prep = partial(self._slice_prep, start=self.start, end=self.end) + self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) + + +class DataHandlerFirFilter(DefaultDataHandler): + """Data handler using FIR filtered data.""" + + data_handler = DataHandlerFirFilterSingleStation + data_handler_transformation = DataHandlerFirFilterSingleStation + _requirements = data_handler.requirements() + + +class DataHandlerKzFilterSingleStation(DataHandlerSingleStation): + """Data handler for a single station to be used by a superior data handler. Inputs are kz filtered.""" + + _requirements = remove_items(inspect.getfullargspec(DataHandlerSingleStation).args, ["self", "station"]) + _hash = DataHandlerSingleStation._hash + ["kz_filter_length", "kz_filter_iter", "filter_dim"] + + DEFAULT_FILTER_DIM = "filter" + + def __init__(self, *args, kz_filter_length, kz_filter_iter, filter_dim=DEFAULT_FILTER_DIM, **kwargs): + self._check_sampling(**kwargs) + # self.original_data = None # ToDo: implement here something to store unfiltered data + self.kz_filter_length = to_list(kz_filter_length) + self.kz_filter_iter = to_list(kz_filter_iter) + self.filter_dim = filter_dim + self.cutoff_period = None + self.cutoff_period_days = None + super().__init__(*args, **kwargs) + + def setup_transformation(self, transformation: Union[None, dict, Tuple]) -> Tuple[Optional[dict], Optional[dict]]: + """ + Adjust setup of transformation because kfz filtered data will have negative values which is not compatible with + the log transformation. Therefore, replace all log transformation methods by a default standardization. This is + only applied on input side. + """ + transformation = super(__class__, self).setup_transformation(transformation) + if transformation[0] is not None: + for k, v in transformation[0].items(): + if v["method"] == "log": + transformation[0][k]["method"] = "standardise" + return transformation + + def _check_sampling(self, **kwargs): + assert kwargs.get("sampling") == "hourly" # This data handler requires hourly data resolution + + def make_input_target(self): + data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, + self.station_type, self.network, self.store_data_locally, self.data_origin) + self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, + limit=self.interpolation_limit) + self.set_inputs_and_targets() + self.apply_kz_filter() + # this is just a code snippet to check the results of the kz filter + # import matplotlib + # matplotlib.use("TkAgg") + # import matplotlib.pyplot as plt + # self.input_data.sel(filter="74d", variables="temp", Stations="DEBW107").plot() + # self.input_data.sel(variables="temp", Stations="DEBW107").plot.line(hue="filter") + + @TimeTrackingWrapper + def apply_kz_filter(self): + """Apply kolmogorov zurbenko filter only on inputs.""" + kz = KZFilter(self.input_data, wl=self.kz_filter_length, itr=self.kz_filter_iter, filter_dim=self.time_dim) + filtered_data: List[xr.DataArray] = kz.run() + self.cutoff_period = kz.period_null() + self.cutoff_period_days = kz.period_null_days() + self.input_data = xr.concat(filtered_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) + + def create_filter_index(self) -> pd.Index: + """ + Round cut off periods in days and append 'res' for residuum index. + + Round small numbers (<10) to single decimal, and higher numbers to int. Transform as list of str and append + 'res' for residuum index. + """ + index = np.round(self.cutoff_period_days, 1) + f = lambda x: int(np.round(x)) if x >= 10 else np.round(x, 1) + index = list(map(f, index.tolist())) + index = list(map(lambda x: str(x) + "d", index)) + ["res"] + return pd.Index(index, name=self.filter_dim) + + def get_transposed_history(self) -> xr.DataArray: + """Return history. + + :return: history with dimensions datetime, window, Stations, variables, filter. + """ + return self.history.transpose(self.time_dim, self.window_dim, self.iter_dim, self.target_dim, + self.filter_dim).copy() + + def _create_lazy_data(self): + return [self._data, self.meta, self.input_data, self.target_data, self.cutoff_period, self.cutoff_period_days] + + def _extract_lazy(self, lazy_data): + _data, self.meta, _input_data, _target_data, self.cutoff_period, self.cutoff_period_days = lazy_data + f_prep = partial(self._slice_prep, start=self.start, end=self.end) + self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) + + +class DataHandlerKzFilter(DefaultDataHandler): + """Data handler using kz filtered data.""" + + data_handler = DataHandlerKzFilterSingleStation + data_handler_transformation = DataHandlerKzFilterSingleStation + _requirements = data_handler.requirements() diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py new file mode 100644 index 0000000000000000000000000000000000000000..ad2fd12d41dac6902ba8e8a078b52165f1d130c8 --- /dev/null +++ b/mlair/helpers/filter.py @@ -0,0 +1,284 @@ +import gc +import warnings +from typing import Union + +import numpy as np +from matplotlib import pyplot as plt +from scipy import signal +import xarray as xr + +from mlair.helpers import to_list, TimeTrackingWrapper + + +class FIRFilter: + + def __init__(self, data, fs, order, cutoff, window, dim): + + filtered = [] + h = [] + for i in range(len(order)): + fi, hi = self.apply_fir_filter(data, fs, order[i], cutoff_low=cutoff[i][0], cutoff_high=cutoff[i][1], + window=window, dim=dim) + filtered.append(fi) + h.append(hi) + + self._filtered = filtered + self._h = h + + def filter_coefficients(self): + return self._h + + def filtered_data(self): + return self._filtered + # + # y, h = fir_filter(station_data.values.flatten(), fs, order[0], cutoff_low=cutoff[0][0], cutoff_high=cutoff[0][1], + # window=window) + # filtered = xr.ones_like(station_data) * y.reshape(station_data.values.shape) + # # band pass + # y_band, h_band = fir_filter(station_data.values.flatten(), fs, order[1], cutoff_low=cutoff[1][0], + # cutoff_high=cutoff[1][1], window=window) + # filtered_band = xr.ones_like(station_data) * y_band.reshape(station_data.values.shape) + # # band pass 2 + # y_band_2, h_band_2 = fir_filter(station_data.values.flatten(), fs, order[2], cutoff_low=cutoff[2][0], + # cutoff_high=cutoff[2][1], window=window) + # filtered_band_2 = xr.ones_like(station_data) * y_band_2.reshape(station_data.values.shape) + # # high pass + # y_high, h_high = fir_filter(station_data.values.flatten(), fs, order[3], cutoff_low=cutoff[3][0], + # cutoff_high=cutoff[3][1], window=window) + # filtered_high = xr.ones_like(station_data) * y_high.reshape(station_data.values.shape) + + def apply_fir_filter(self, data, fs, order=5, cutoff_low=None, cutoff_high=None, window="hamming", dim="variables"): + + # create fir filter coeffs + cutoff = [] + if cutoff_low is not None: + cutoff += [cutoff_low] + if cutoff_high is not None: + cutoff += [cutoff_high] + if len(cutoff) == 2: + filter_type = "bandpass" + elif len(cutoff) == 1 and cutoff_low is not None: + filter_type = "highpass" + elif len(cutoff) == 1 and cutoff_high is not None: + filter_type = "lowpass" + else: + raise ValueError("Please provide either cutoff_low or cutoff_high.") + h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window) + + # filter data + filtered = xr.ones_like(data) + for var in data.coords[dim]: + d = data.sel({dim: var}).values.flatten() + y = signal.lfilter(h, 1., d) + filtered.loc[{dim: var}] = y + return filtered, h + + +class KolmogorovZurbenkoBaseClass: + + def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): + """ + It create the variables associate with the Kolmogorov-Zurbenko-filter. + + Args: + df(pd.DataFrame, None): time series of a variable + wl(list of int): window length + itr(list of int): number of iteration + """ + self.df = df + self.filter_dim = filter_dim + self.wl = to_list(wl) + self.itr = to_list(itr) + if abs(len(self.wl) - len(self.itr)) > 0: + raise ValueError("Length of lists for wl and itr must agree!") + self._isChild = is_child + self.child = self.set_child() + self.type = type(self).__name__ + + def set_child(self): + if len(self.wl) > 1: + return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim) + else: + return None + + def kz_filter(self, df, m, k): + pass + + def spectral_calc(self): + df_start = self.df + kz = self.kz_filter(df_start, self.wl[0], self.itr[0]) + filtered = self.subtract(df_start, kz) + # case I: no child avail -> return kz and remaining + if self.child is None: + return [kz, filtered] + # case II: has child -> return current kz and all child results + else: + self.child.df = filtered + kz_next = self.child.spectral_calc() + return [kz] + kz_next + + @staticmethod + def subtract(minuend, subtrahend): + try: # pandas implementation + return minuend.sub(subtrahend, axis=0) + except AttributeError: # general implementation + return minuend - subtrahend + + def run(self): + return self.spectral_calc() + + def transfer_function(self): + m = self.wl[0] + k = self.itr[0] + omega = np.linspace(0.00001, 0.15, 5000) + return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k) + + def omega_null(self, alpha=0.5): + a = np.sqrt(6) / np.pi + b = 1 / (2 * np.array(self.itr)) + c = 1 - alpha ** b + d = np.array(self.wl) ** 2 - alpha ** b + return a * np.sqrt(c / d) + + def period_null(self, alpha=0.5): + return 1. / self.omega_null(alpha) + + def period_null_days(self, alpha=0.5): + return self.period_null(alpha) / 24. + + def plot_transfer_function(self, fig=None, name=None): + if fig is None: + fig = plt.figure() + omega, transfer_function = self.transfer_function() + if self.child is not None: + transfer_function_child = self.child.plot_transfer_function(fig) + else: + transfer_function_child = transfer_function * 0 + plt.semilogx(omega, transfer_function - transfer_function_child, + label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0], + self.itr[0], + self.period_null_days())) + plt.axvline(x=self.omega_null()) + if not self._isChild: + locs, labels = plt.xticks() + plt.xticks(locs, np.round(1. / (locs * 24), 1)) + plt.xlim([0.00001, 0.15]) + plt.legend() + if name is None: + plt.show() + else: + plt.savefig(name) + else: + return transfer_function + + +class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): + + def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window", + method="mean", percentile=0.5): + """ + It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class. + + Args: + df(pd.DataFrame, xr.DataArray): time series of a variable + wl: window length + itr: number of iteration + """ + self.valid_methods = ["mean", "percentile", "median", "max", "min"] + if method not in self.valid_methods: + raise ValueError("Method '{}' is not supported. Please select from [{}].".format( + method, ", ".join(self.valid_methods))) + else: + self.method = method + if percentile > 1 or percentile < 0: + raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile)) + else: + self.percentile = percentile + super().__init__(df, wl, itr, is_child, filter_dim) + + def set_child(self): + if len(self.wl) > 1: + return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True, + filter_dim=self.filter_dim, method=self.method, + percentile=self.percentile) + else: + return None + + @TimeTrackingWrapper + def kz_filter_new(self, df, wl, itr): + """ + It passes the low frequency time series. + + If filter method is from mean, max, min this method will call construct and rechunk before the actual + calculation to improve performance. If filter method is either median or percentile this approach is not + applicable and depending on the data and window size, this method can become slow. + + Args: + wl(int): a window length + itr(int): a number of iteration + """ + warnings.filterwarnings("ignore") + df_itr = df.__deepcopy__() + try: + kwargs = {"min_periods": int(0.7 * wl), + "center": True, + self.filter_dim: wl} + for i in np.arange(0, itr): + print(i) + rolling = df_itr.chunk().rolling(**kwargs) + if self.method not in ["percentile", "median"]: + rolling = rolling.construct("construct").chunk("auto") + if self.method == "median": + df_mv_avg_tmp = rolling.median() + elif self.method == "percentile": + df_mv_avg_tmp = rolling.quantile(self.percentile) + elif self.method == "max": + df_mv_avg_tmp = rolling.max("construct") + elif self.method == "min": + df_mv_avg_tmp = rolling.min("construct") + else: + df_mv_avg_tmp = rolling.mean("construct") + df_itr = df_mv_avg_tmp.compute() + del df_mv_avg_tmp, rolling + gc.collect() + return df_itr + except ValueError: + raise ValueError + + @TimeTrackingWrapper + def kz_filter(self, df, wl, itr): + """ + It passes the low frequency time series. + + Args: + wl(int): a window length + itr(int): a number of iteration + """ + import warnings + warnings.filterwarnings("ignore") + df_itr = df.__deepcopy__() + try: + kwargs = {"min_periods": int(0.7 * wl), + "center": True, + self.filter_dim: wl} + iter_vars = df_itr.coords["variables"].values + for var in iter_vars: + df_itr_var = df_itr.sel(variables=[var]) + for _ in np.arange(0, itr): + df_itr_var = df_itr_var.chunk() + rolling = df_itr_var.rolling(**kwargs) + if self.method == "median": + df_mv_avg_tmp = rolling.median() + elif self.method == "percentile": + df_mv_avg_tmp = rolling.quantile(self.percentile) + elif self.method == "max": + df_mv_avg_tmp = rolling.max() + elif self.method == "min": + df_mv_avg_tmp = rolling.min() + else: + df_mv_avg_tmp = rolling.mean() + df_itr_var = df_mv_avg_tmp.compute() + df_itr.loc[{"variables": [var]}] = df_itr_var + return df_itr + except ValueError: + raise ValueError diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 30391998c65950f12fc6824626638788e1bd721b..0ee950981a716164e2e9f4e9b8876a4de69ecd19 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -9,12 +9,7 @@ import numpy as np import xarray as xr import pandas as pd from typing import Union, Tuple, Dict, List -from matplotlib import pyplot as plt import itertools -import gc -import warnings - -from mlair.helpers import to_list, TimeTracking, TimeTrackingWrapper Data = Union[xr.DataArray, pd.DataFrame] @@ -483,212 +478,3 @@ class SkillScores: return monthly_mean - -class KolmogorovZurbenkoBaseClass: - - def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): - """ - It create the variables associate with the Kolmogorov-Zurbenko-filter. - - Args: - df(pd.DataFrame, None): time series of a variable - wl(list of int): window length - itr(list of int): number of iteration - """ - self.df = df - self.filter_dim = filter_dim - self.wl = to_list(wl) - self.itr = to_list(itr) - if abs(len(self.wl) - len(self.itr)) > 0: - raise ValueError("Length of lists for wl and itr must agree!") - self._isChild = is_child - self.child = self.set_child() - self.type = type(self).__name__ - - def set_child(self): - if len(self.wl) > 1: - return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim) - else: - return None - - def kz_filter(self, df, m, k): - pass - - def spectral_calc(self): - df_start = self.df - kz = self.kz_filter(df_start, self.wl[0], self.itr[0]) - filtered = self.subtract(df_start, kz) - # case I: no child avail -> return kz and remaining - if self.child is None: - return [kz, filtered] - # case II: has child -> return current kz and all child results - else: - self.child.df = filtered - kz_next = self.child.spectral_calc() - return [kz] + kz_next - - @staticmethod - def subtract(minuend, subtrahend): - try: # pandas implementation - return minuend.sub(subtrahend, axis=0) - except AttributeError: # general implementation - return minuend - subtrahend - - def run(self): - return self.spectral_calc() - - def transfer_function(self): - m = self.wl[0] - k = self.itr[0] - omega = np.linspace(0.00001, 0.15, 5000) - return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k) - - def omega_null(self, alpha=0.5): - a = np.sqrt(6) / np.pi - b = 1 / (2 * np.array(self.itr)) - c = 1 - alpha ** b - d = np.array(self.wl) ** 2 - alpha ** b - return a * np.sqrt(c / d) - - def period_null(self, alpha=0.5): - return 1. / self.omega_null(alpha) - - def period_null_days(self, alpha=0.5): - return self.period_null(alpha) / 24. - - def plot_transfer_function(self, fig=None, name=None): - if fig is None: - fig = plt.figure() - omega, transfer_function = self.transfer_function() - if self.child is not None: - transfer_function_child = self.child.plot_transfer_function(fig) - else: - transfer_function_child = transfer_function * 0 - plt.semilogx(omega, transfer_function - transfer_function_child, - label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0], - self.itr[0], - self.period_null_days())) - plt.axvline(x=self.omega_null()) - if not self._isChild: - locs, labels = plt.xticks() - plt.xticks(locs, np.round(1. / (locs * 24), 1)) - plt.xlim([0.00001, 0.15]) - plt.legend() - if name is None: - plt.show() - else: - plt.savefig(name) - else: - return transfer_function - - -class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): - - def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window", - method="mean", percentile=0.5): - """ - It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class. - - Args: - df(pd.DataFrame, xr.DataArray): time series of a variable - wl: window length - itr: number of iteration - """ - self.valid_methods = ["mean", "percentile", "median", "max", "min"] - if method not in self.valid_methods: - raise ValueError("Method '{}' is not supported. Please select from [{}].".format( - method, ", ".join(self.valid_methods))) - else: - self.method = method - if percentile > 1 or percentile < 0: - raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile)) - else: - self.percentile = percentile - super().__init__(df, wl, itr, is_child, filter_dim) - - def set_child(self): - if len(self.wl) > 1: - return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True, - filter_dim=self.filter_dim, method=self.method, - percentile=self.percentile) - else: - return None - - @TimeTrackingWrapper - def kz_filter_new(self, df, wl, itr): - """ - It passes the low frequency time series. - - If filter method is from mean, max, min this method will call construct and rechunk before the actual - calculation to improve performance. If filter method is either median or percentile this approach is not - applicable and depending on the data and window size, this method can become slow. - - Args: - wl(int): a window length - itr(int): a number of iteration - """ - warnings.filterwarnings("ignore") - df_itr = df.__deepcopy__() - try: - kwargs = {"min_periods": int(0.7 * wl), - "center": True, - self.filter_dim: wl} - for i in np.arange(0, itr): - print(i) - rolling = df_itr.chunk().rolling(**kwargs) - if self.method not in ["percentile", "median"]: - rolling = rolling.construct("construct").chunk("auto") - if self.method == "median": - df_mv_avg_tmp = rolling.median() - elif self.method == "percentile": - df_mv_avg_tmp = rolling.quantile(self.percentile) - elif self.method == "max": - df_mv_avg_tmp = rolling.max("construct") - elif self.method == "min": - df_mv_avg_tmp = rolling.min("construct") - else: - df_mv_avg_tmp = rolling.mean("construct") - df_itr = df_mv_avg_tmp.compute() - del df_mv_avg_tmp, rolling - gc.collect() - return df_itr - except ValueError: - raise ValueError - - @TimeTrackingWrapper - def kz_filter(self, df, wl, itr): - """ - It passes the low frequency time series. - - Args: - wl(int): a window length - itr(int): a number of iteration - """ - import warnings - warnings.filterwarnings("ignore") - df_itr = df.__deepcopy__() - try: - kwargs = {"min_periods": int(0.7 * wl), - "center": True, - self.filter_dim: wl} - iter_vars = df_itr.coords["variables"].values - for var in iter_vars: - df_itr_var = df_itr.sel(variables=[var]) - for _ in np.arange(0, itr): - df_itr_var = df_itr_var.chunk() - rolling = df_itr_var.rolling(**kwargs) - if self.method == "median": - df_mv_avg_tmp = rolling.median() - elif self.method == "percentile": - df_mv_avg_tmp = rolling.quantile(self.percentile) - elif self.method == "max": - df_mv_avg_tmp = rolling.max() - elif self.method == "min": - df_mv_avg_tmp = rolling.min() - else: - df_mv_avg_tmp = rolling.mean() - df_itr_var = df_mv_avg_tmp.compute() - df_itr.loc[{"variables": [var]}] = df_itr_var - return df_itr - except ValueError: - raise ValueError diff --git a/test/test_data_handler/test_data_handler_mixed_sampling.py b/test/test_data_handler/test_data_handler_mixed_sampling.py index 2a6553b7f495bb4eb8aeddf7c39f2f2517edc967..19899a77471a66bbf437e2e0b365fcfb7eeab17f 100644 --- a/test/test_data_handler/test_data_handler_mixed_sampling.py +++ b/test/test_data_handler/test_data_handler_mixed_sampling.py @@ -5,7 +5,7 @@ from mlair.data_handler.data_handler_mixed_sampling import DataHandlerMixedSampl DataHandlerMixedSamplingSingleStation, DataHandlerMixedSamplingWithFilter, \ DataHandlerMixedSamplingWithFilterSingleStation, DataHandlerSeparationOfScales, \ DataHandlerSeparationOfScalesSingleStation -from mlair.data_handler.data_handler_kz_filter import DataHandlerKzFilterSingleStation +from mlair.data_handler.data_handler_with_filter import DataHandlerKzFilterSingleStation from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation from mlair.helpers import remove_items from mlair.configuration.defaults import DEFAULT_INTERPOLATION_METHOD