Skip to content
Snippets Groups Projects
Commit 1bbe4cbb authored by lukas leufen's avatar lukas leufen
Browse files

added kz filter and first usage try

parent 0f645b8d
Branches
Tags
1 merge request!76WIP: Resolve "KZ filter"
Pipeline #32108 passed
......@@ -11,8 +11,7 @@ import numpy as np
import pandas as pd
import xarray as xr
from src import join, helpers
from src import statistics
from src import join, helpers, statistics, kz_filter
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
......@@ -61,6 +60,7 @@ class DataPrep(object):
self.kwargs = kwargs
self.data = None
self.meta = None
self.data_kz_filter = None
self._transform_method = None
self.statistics_per_var = kwargs.get("statistics_per_var", None)
self.sampling = kwargs.get("sampling", "daily")
......@@ -420,7 +420,10 @@ class DataPrep(object):
def get_transposed_label(self):
return self.label.squeeze("Stations").transpose("datetime", "window").copy()
def apply_kz_filter(self):
kz = kz_filter.KolmogorovZurbenkoFilterMovingWindow(self.data, [3, 13], [3, 5], filter_dim="datetime")
self.data_kz_filter = kz.run()
if __name__ == "__main__":
dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
dp = DataPrep('../../data/toar_daily', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
class KolmogorovZurbenkoBaseClass:
def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
"""
It create the variables associate with the Kolmogorov-Zurbenko-filter.
Args:
df(pd.DataFrame): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
self.df = df
self.filter_dim = filter_dim
self.wl = set_param(wl)
self.itr = set_param(itr)
if abs(len(self.wl)-len(self.itr)) > 0:
raise ValueError("Length of lists for wl and itr must agree!")
self._isChild = is_child
self.child = self.set_child()
self.type = type(self).__name__
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoBaseClass(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim)
else:
return None
def kz_filter(self, m, k):
pass
def spectral_calc(self):
kz = self.kz_filter(self.wl[0], self.itr[0])
if self.child is None:
if not self._isChild:
return [self.df.sub(kz, axis=0), kz]
else:
return [kz, kz]
else:
if not self._isChild:
kz_next = self.child.spectral_calc()
return [self.subtract(self.df, kz), self.subtract(kz, kz_next[0])] + kz_next[1:]
else:
kz_next = self.child.spectral_calc()
return [kz, self.subtract(kz, kz_next[0])] + kz_next[1:]
@staticmethod
def subtract(minuend, subtrahend):
try: # pandas implementation
return minuend.sub(subtrahend, axis=0)
except AttributeError: # general implementation
return minuend - subtrahend
def run(self):
return self.spectral_calc()
def transfer_function(self):
m = self.wl[0]
k = self.itr[0]
omega = np.linspace(0.00001, 0.15, 5000)
return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k)
def omega_null(self, alpha=0.5):
a = np.sqrt(6) / np.pi
b = 1 / (2 * self.itr[0])
c = 1 - alpha ** b
d = self.wl[0] ** 2 - alpha ** b
return a * np.sqrt(c / d)
def period_null(self, alpha=0.5):
return 1 / self.omega_null(alpha)
def period_null_days(self, alpha=0.5):
return self.period_null(alpha) / 24
def plot_transfer_function(self, fig=None):
if fig is None:
fig = plt.figure()
omega, transfer_function = self.transfer_function()
if self.child is not None:
transfer_function_child = self.child.plot_transfer_function(fig)
else:
transfer_function_child = transfer_function * 0
plt.semilogx(omega, transfer_function - transfer_function_child,
label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0],
self.itr[0],
self.period_null_days()))
plt.axvline(x=self.omega_null())
if not self._isChild:
locs, labels = plt.xticks()
plt.xticks(locs, np.round(1. / (locs * 24), 1))
plt.xlim([0.00001, 0.15])
plt.legend()
plt.show()
else:
return transfer_function
class KolmogorovZurbenkoFilterExplicit(KolmogorovZurbenkoBaseClass):
def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
"""
It create the variables associate with the KolmogorovZurbenkoFilterExplicit class.
Args:
df(pd.DataFrame): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
super().__init__(df, wl, itr, is_child, filter_dim)
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoFilterExplicit(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim)
else:
return None
def kz_filter(self, m, k):
"""
It passes the low frequency time series.
Args:
m(int): a window length
k(int): a number of iteration
"""
df_itr = self.df.__deepcopy__()
coeff = _kz_coeff(m, k)
prod = _kz_prod(df_itr, coeff, m, k)
kz_sum = _kz_sum(prod, coeff)
return to_dataframe(kz_sum, df_itr.index, m, k, df_itr.columns)
class KolmogorovZurbenkoFourierTransformation(KolmogorovZurbenkoFilterExplicit):
def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
super().__init__(df, wl, itr, is_child, filter_dim)
def kzft(self, data, m, k, nu):
coeff = _kz_coeff(m, k)
data = _kz_prod(data, coeff, m, k)
s = k * (m - 1) / 2
s = np.arange(-s, s + 1)
exp = np.exp(-1j * 2 * np.pi * nu * s)
data = data * exp
kz_sum = _kz_sum(data, coeff)
return to_dataframe(kz_sum, data.index, m, k, data.columns)
def kzp(self, data, m, k, nu):
w = k * (m - 1) + 1
L = int(m / 10)
nt = int((data.size - w) / L +1)
class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
def __init__(self, df, wl, itr, is_child=False, filter_dim="window", method="mean", percentile=0.5):
"""
It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class.
Args:
df(pd.DataFrame): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
self.valid_methods = ["mean", "percentile", "median", "max", "min"]
if method not in self.valid_methods:
raise ValueError("Method '{}' is not supported. Please select from [{}].".format(
method, ", ".join(self.valid_methods)))
else:
self.method = method
if percentile > 1 or percentile < 0:
raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile))
else:
self.percentile = percentile
super().__init__(df, wl, itr, is_child, filter_dim)
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True,
filter_dim=self.filter_dim, method=self.method,
percentile=self.percentile)
else:
return None
def kz_filter(self, wl, itr):
"""
It passes the low frequency time series.
Args:
wl(int): a window length
itr(int): a number of iteration
"""
df_itr = self.df.__deepcopy__()
try:
kwargs = {"min_periods": 1,
"center": True,
self.filter_dim: wl}
for _ in np.arange(0, itr):
rolling = df_itr.rolling(**kwargs)
if self.method == "median":
df_mv_avg_tmp = rolling.median()
elif self.method == "percentile":
df_mv_avg_tmp = rolling.quantile(self.percentile)
elif self.method == "max":
df_mv_avg_tmp = rolling.max()
elif self.method == "min":
df_mv_avg_tmp = rolling.min()
else:
df_mv_avg_tmp = rolling.mean()
df_itr = df_mv_avg_tmp
return df_itr
except ValueError:
raise ValueError
def set_param(param):
if not isinstance(param, list):
param = [param]
return param
def _kz_sum(data, coeff):
k_nan = np.isnan(data)
if np.any(k_nan):
# flag weights for missing values
coeff = np.tile(coeff, (data.shape[0], 1))
coeff = np.ma.MaskedArray(coeff, mask=k_nan)
k = coeff.mask.any(axis=-1)
# calculate sum of weights skipping missing data
coeff = np.sum(coeff, axis=-1)
data = np.nansum(data, axis=-1)
# adjust weighted average if missing data is in sliding window
data[k] = data[k] / coeff[k]
return data
else:
return np.sum(data, axis=-1)
def _sliding_window(arr, window):
# get length of array
n = arr.shape[0]
# create index shifted by one for each new line
indexer = np.arange(window)[None, :] + np.arange(-window+1, n)[:, None]
# expand on edges with nans
indexer[indexer >= n] = -1
arr_windows = arr[indexer]
arr_windows[indexer < 0] = np.nan
return arr_windows[:, :, 0]
def _kz_prod(data, coeff, m, k):
n = data.size
data = _sliding_window(data.values, k * (m - 1) + 1)
assert data.shape == (n + k * (m-1), len(coeff))
return data * coeff
def _kz_coeff(m, k):
coeff = np.ones(m)
for i in range(1, k):
t = np.zeros((m, m + i * (m - 1)))
for km in range(m):
t[km, km:km + coeff.size] = coeff
coeff = np.sum(t, axis=0)
assert coeff.size == k * (m - 1) + 1
return coeff / m ** k
def to_dataframe(data, index, m, k, columns):
n_expanded = k * (m-1) / 2
freq = pd.infer_freq(index)
start = index[0] - pd.Timedelta(n_expanded, unit=freq)
end = index[-1] + pd.Timedelta(n_expanded, unit=freq)
index = pd.date_range(start, end, freq=freq)
return pd.DataFrame(data=data, index=index, columns=columns)
if __name__ == '__main__':
# create a test data
N = 20000
l = 2000
my_test_data = np.sin(np.linspace(1, l, N)) + np.random.normal(scale=0.1, size=len(np.linspace(1, l, N))) \
+ np.sin(np.linspace(0, l*2, N))
test_df = pd.DataFrame(data=my_test_data, index=pd.date_range('2012-10-01', periods=N, freq='1d'),
columns=['value'])
# run the spectral test
obj = KolmogorovZurbenkoFilterExplicit(test_df, [3, 13, 103], [3, 5, 5])
ID, DU, SY, BL = obj.run()
# run the spectral test
obj = KolmogorovZurbenkoFilterMovingWindow(test_df, [3, 13, 103], [3, 5, 5])
ID2, DU2, SY2, BL2 = obj.run()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment