diff --git a/src/data_handling/data_preparation.py b/src/data_handling/data_preparation.py index e3186778b94375ba1d39fa87ba7d2980c785581e..a2b6ce9045ce12cdbc47001cd0c6b80992dde69e 100644 --- a/src/data_handling/data_preparation.py +++ b/src/data_handling/data_preparation.py @@ -11,8 +11,7 @@ import numpy as np import pandas as pd import xarray as xr -from src import join, helpers -from src import statistics +from src import join, helpers, statistics, kz_filter # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -61,6 +60,7 @@ class DataPrep(object): self.kwargs = kwargs self.data = None self.meta = None + self.data_kz_filter = None self._transform_method = None self.statistics_per_var = kwargs.get("statistics_per_var", None) self.sampling = kwargs.get("sampling", "daily") @@ -420,7 +420,10 @@ class DataPrep(object): def get_transposed_label(self): return self.label.squeeze("Stations").transpose("datetime", "window").copy() + def apply_kz_filter(self): + kz = kz_filter.KolmogorovZurbenkoFilterMovingWindow(self.data, [3, 13], [3, 5], filter_dim="datetime") + self.data_kz_filter = kz.run() if __name__ == "__main__": - dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + dp = DataPrep('../../data/toar_daily', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) print(dp) diff --git a/src/kz_filter.py b/src/kz_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..1c510925f1e05317c550ec44e19528064da2da12 --- /dev/null +++ b/src/kz_filter.py @@ -0,0 +1,325 @@ +import pandas as pd +import numpy as np +from matplotlib import pyplot as plt + + +class KolmogorovZurbenkoBaseClass: + + def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): + """ + It create the variables associate with the Kolmogorov-Zurbenko-filter. + + Args: + df(pd.DataFrame): time series of a variable + wl(list of int): window length + itr(list of int): number of iteration + """ + self.df = df + self.filter_dim = filter_dim + self.wl = set_param(wl) + self.itr = set_param(itr) + if abs(len(self.wl)-len(self.itr)) > 0: + raise ValueError("Length of lists for wl and itr must agree!") + self._isChild = is_child + self.child = self.set_child() + self.type = type(self).__name__ + + def set_child(self): + if len(self.wl) > 1: + return KolmogorovZurbenkoBaseClass(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim) + else: + return None + + def kz_filter(self, m, k): + pass + + def spectral_calc(self): + kz = self.kz_filter(self.wl[0], self.itr[0]) + if self.child is None: + if not self._isChild: + return [self.df.sub(kz, axis=0), kz] + else: + return [kz, kz] + else: + if not self._isChild: + kz_next = self.child.spectral_calc() + return [self.subtract(self.df, kz), self.subtract(kz, kz_next[0])] + kz_next[1:] + else: + kz_next = self.child.spectral_calc() + return [kz, self.subtract(kz, kz_next[0])] + kz_next[1:] + + @staticmethod + def subtract(minuend, subtrahend): + try: # pandas implementation + return minuend.sub(subtrahend, axis=0) + except AttributeError: # general implementation + return minuend - subtrahend + + + + def run(self): + return self.spectral_calc() + + def transfer_function(self): + m = self.wl[0] + k = self.itr[0] + omega = np.linspace(0.00001, 0.15, 5000) + return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k) + + def omega_null(self, alpha=0.5): + a = np.sqrt(6) / np.pi + b = 1 / (2 * self.itr[0]) + c = 1 - alpha ** b + d = self.wl[0] ** 2 - alpha ** b + return a * np.sqrt(c / d) + + def period_null(self, alpha=0.5): + return 1 / self.omega_null(alpha) + + def period_null_days(self, alpha=0.5): + return self.period_null(alpha) / 24 + + def plot_transfer_function(self, fig=None): + if fig is None: + fig = plt.figure() + omega, transfer_function = self.transfer_function() + if self.child is not None: + transfer_function_child = self.child.plot_transfer_function(fig) + else: + transfer_function_child = transfer_function * 0 + plt.semilogx(omega, transfer_function - transfer_function_child, + label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0], + self.itr[0], + self.period_null_days())) + plt.axvline(x=self.omega_null()) + if not self._isChild: + locs, labels = plt.xticks() + plt.xticks(locs, np.round(1. / (locs * 24), 1)) + plt.xlim([0.00001, 0.15]) + plt.legend() + plt.show() + else: + return transfer_function + + +class KolmogorovZurbenkoFilterExplicit(KolmogorovZurbenkoBaseClass): + + def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): + """ + It create the variables associate with the KolmogorovZurbenkoFilterExplicit class. + + Args: + df(pd.DataFrame): time series of a variable + wl(list of int): window length + itr(list of int): number of iteration + """ + super().__init__(df, wl, itr, is_child, filter_dim) + + def set_child(self): + if len(self.wl) > 1: + return KolmogorovZurbenkoFilterExplicit(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim) + else: + return None + + def kz_filter(self, m, k): + """ + It passes the low frequency time series. + + Args: + m(int): a window length + k(int): a number of iteration + """ + df_itr = self.df.__deepcopy__() + + coeff = _kz_coeff(m, k) + prod = _kz_prod(df_itr, coeff, m, k) + kz_sum = _kz_sum(prod, coeff) + + return to_dataframe(kz_sum, df_itr.index, m, k, df_itr.columns) + + +class KolmogorovZurbenkoFourierTransformation(KolmogorovZurbenkoFilterExplicit): + + def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): + super().__init__(df, wl, itr, is_child, filter_dim) + + def kzft(self, data, m, k, nu): + coeff = _kz_coeff(m, k) + data = _kz_prod(data, coeff, m, k) + + s = k * (m - 1) / 2 + s = np.arange(-s, s + 1) + exp = np.exp(-1j * 2 * np.pi * nu * s) + + data = data * exp + kz_sum = _kz_sum(data, coeff) + return to_dataframe(kz_sum, data.index, m, k, data.columns) + + def kzp(self, data, m, k, nu): + + w = k * (m - 1) + 1 + L = int(m / 10) + nt = int((data.size - w) / L +1) + + +class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): + + def __init__(self, df, wl, itr, is_child=False, filter_dim="window", method="mean", percentile=0.5): + """ + It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class. + + Args: + df(pd.DataFrame): time series of a variable + wl(list of int): window length + itr(list of int): number of iteration + """ + self.valid_methods = ["mean", "percentile", "median", "max", "min"] + if method not in self.valid_methods: + raise ValueError("Method '{}' is not supported. Please select from [{}].".format( + method, ", ".join(self.valid_methods))) + else: + self.method = method + if percentile > 1 or percentile < 0: + raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile)) + else: + self.percentile = percentile + super().__init__(df, wl, itr, is_child, filter_dim) + + def set_child(self): + if len(self.wl) > 1: + return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True, + filter_dim=self.filter_dim, method=self.method, + percentile=self.percentile) + else: + return None + + def kz_filter(self, wl, itr): + """ + It passes the low frequency time series. + + Args: + wl(int): a window length + itr(int): a number of iteration + """ + df_itr = self.df.__deepcopy__() + try: + kwargs = {"min_periods": 1, + "center": True, + self.filter_dim: wl} + for _ in np.arange(0, itr): + rolling = df_itr.rolling(**kwargs) + if self.method == "median": + df_mv_avg_tmp = rolling.median() + elif self.method == "percentile": + df_mv_avg_tmp = rolling.quantile(self.percentile) + elif self.method == "max": + df_mv_avg_tmp = rolling.max() + elif self.method == "min": + df_mv_avg_tmp = rolling.min() + else: + df_mv_avg_tmp = rolling.mean() + df_itr = df_mv_avg_tmp + return df_itr + except ValueError: + raise ValueError + + + +def set_param(param): + if not isinstance(param, list): + param = [param] + return param + + +def _kz_sum(data, coeff): + k_nan = np.isnan(data) + + if np.any(k_nan): + + # flag weights for missing values + coeff = np.tile(coeff, (data.shape[0], 1)) + coeff = np.ma.MaskedArray(coeff, mask=k_nan) + k = coeff.mask.any(axis=-1) + + # calculate sum of weights skipping missing data + coeff = np.sum(coeff, axis=-1) + data = np.nansum(data, axis=-1) + + # adjust weighted average if missing data is in sliding window + data[k] = data[k] / coeff[k] + + return data + + else: + return np.sum(data, axis=-1) + + +def _sliding_window(arr, window): + + # get length of array + n = arr.shape[0] + + # create index shifted by one for each new line + indexer = np.arange(window)[None, :] + np.arange(-window+1, n)[:, None] + + # expand on edges with nans + indexer[indexer >= n] = -1 + arr_windows = arr[indexer] + arr_windows[indexer < 0] = np.nan + + return arr_windows[:, :, 0] + + +def _kz_prod(data, coeff, m, k): + + n = data.size + data = _sliding_window(data.values, k * (m - 1) + 1) + assert data.shape == (n + k * (m-1), len(coeff)) + + return data * coeff + + +def _kz_coeff(m, k): + + coeff = np.ones(m) + + for i in range(1, k): + + t = np.zeros((m, m + i * (m - 1))) + for km in range(m): + t[km, km:km + coeff.size] = coeff + + coeff = np.sum(t, axis=0) + + assert coeff.size == k * (m - 1) + 1 + + return coeff / m ** k + + +def to_dataframe(data, index, m, k, columns): + n_expanded = k * (m-1) / 2 + freq = pd.infer_freq(index) + start = index[0] - pd.Timedelta(n_expanded, unit=freq) + end = index[-1] + pd.Timedelta(n_expanded, unit=freq) + index = pd.date_range(start, end, freq=freq) + return pd.DataFrame(data=data, index=index, columns=columns) + + + + +if __name__ == '__main__': + # create a test data + N = 20000 + l = 2000 + my_test_data = np.sin(np.linspace(1, l, N)) + np.random.normal(scale=0.1, size=len(np.linspace(1, l, N))) \ + + np.sin(np.linspace(0, l*2, N)) + test_df = pd.DataFrame(data=my_test_data, index=pd.date_range('2012-10-01', periods=N, freq='1d'), + columns=['value']) + + # run the spectral test + obj = KolmogorovZurbenkoFilterExplicit(test_df, [3, 13, 103], [3, 5, 5]) + ID, DU, SY, BL = obj.run() + + # run the spectral test + obj = KolmogorovZurbenkoFilterMovingWindow(test_df, [3, 13, 103], [3, 5, 5]) + ID2, DU2, SY2, BL2 = obj.run() \ No newline at end of file