Skip to content
Snippets Groups Projects
Commit 07c8bcf5 authored by leufen1's avatar leufen1
Browse files

added kz filter to statistics

parent 0efdfb16
No related branches found
No related tags found
3 merge requests!192include Develop,!191Resolve "release v1.1.0",!177Resolve "KZ Filter creating additional dimension"
Pipeline #50333 passed
......@@ -9,6 +9,9 @@ import numpy as np
import xarray as xr
import pandas as pd
from typing import Union, Tuple, Dict
from matplotlib import pyplot as plt
from mlair.helpers import to_list
Data = Union[xr.DataArray, pd.DataFrame]
......@@ -345,3 +348,166 @@ class SkillScores:
monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values
return monthly_mean
class KolmogorovZurbenkoBaseClass:
def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
"""
It create the variables associate with the Kolmogorov-Zurbenko-filter.
Args:
df(pd.DataFrame): time series of a variable
wl(list of int): window length
itr(list of int): number of iteration
"""
self.df = df
self.filter_dim = filter_dim
self.wl = to_list(wl)
self.itr = to_list(itr)
if abs(len(self.wl) - len(self.itr)) > 0:
raise ValueError("Length of lists for wl and itr must agree!")
self._isChild = is_child
self.child = self.set_child()
self.type = type(self).__name__
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoBaseClass(self.df, self.wl[1:], self.itr[1:], True, self.filter_dim)
else:
return None
def kz_filter(self, m, k):
pass
def spectral_calc(self):
kz = self.kz_filter(self.wl[0], self.itr[0])
if self.child is None:
if not self._isChild:
return [self.subtract(self.df, kz), kz]
else:
return [kz, kz]
else:
if not self._isChild:
kz_next = self.child.spectral_calc()
return [self.subtract(self.df, kz), self.subtract(kz, kz_next[0])] + kz_next[1:]
else:
kz_next = self.child.spectral_calc()
return [kz, self.subtract(kz, kz_next[0])] + kz_next[1:]
@staticmethod
def subtract(minuend, subtrahend):
try: # pandas implementation
return minuend.sub(subtrahend, axis=0)
except AttributeError: # general implementation
return minuend - subtrahend
def run(self):
return self.spectral_calc()
def transfer_function(self):
m = self.wl[0]
k = self.itr[0]
omega = np.linspace(0.00001, 0.15, 5000)
return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k)
def omega_null(self, alpha=0.5):
a = np.sqrt(6) / np.pi
b = 1 / (2 * self.itr[0])
c = 1 - alpha ** b
d = self.wl[0] ** 2 - alpha ** b
return a * np.sqrt(c / d)
def period_null(self, alpha=0.5):
return 1. / self.omega_null(alpha)
def period_null_days(self, alpha=0.5):
return self.period_null(alpha) / 24.
def plot_transfer_function(self, fig=None, name=None):
if fig is None:
fig = plt.figure()
omega, transfer_function = self.transfer_function()
if self.child is not None:
transfer_function_child = self.child.plot_transfer_function(fig)
else:
transfer_function_child = transfer_function * 0
plt.semilogx(omega, transfer_function - transfer_function_child,
label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0],
self.itr[0],
self.period_null_days()))
plt.axvline(x=self.omega_null())
if not self._isChild:
locs, labels = plt.xticks()
plt.xticks(locs, np.round(1. / (locs * 24), 1))
plt.xlim([0.00001, 0.15])
plt.legend()
if name is None:
plt.show()
else:
plt.savefig(name)
else:
return transfer_function
class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window",
method="mean", percentile=0.5):
"""
It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class.
Args:
df(pd.DataFrame): time series of a variable
wl: window length
itr: number of iteration
"""
self.valid_methods = ["mean", "percentile", "median", "max", "min"]
if method not in self.valid_methods:
raise ValueError("Method '{}' is not supported. Please select from [{}].".format(
method, ", ".join(self.valid_methods)))
else:
self.method = method
if percentile > 1 or percentile < 0:
raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile))
else:
self.percentile = percentile
super().__init__(df, wl, itr, is_child, filter_dim)
def set_child(self):
if len(self.wl) > 1:
return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True,
filter_dim=self.filter_dim, method=self.method,
percentile=self.percentile)
else:
return None
def kz_filter(self, wl, itr):
"""
It passes the low frequency time series.
Args:
wl(int): a window length
itr(int): a number of iteration
"""
df_itr = self.df.__deepcopy__()
try:
kwargs = {"min_periods": 1,
"center": True,
self.filter_dim: wl}
for _ in np.arange(0, itr):
rolling = df_itr.rolling(**kwargs)
if self.method == "median":
df_mv_avg_tmp = rolling.median()
elif self.method == "percentile":
df_mv_avg_tmp = rolling.quantile(self.percentile)
elif self.method == "max":
df_mv_avg_tmp = rolling.max()
elif self.method == "min":
df_mv_avg_tmp = rolling.min()
else:
df_mv_avg_tmp = rolling.mean()
df_itr = df_mv_avg_tmp
return df_itr
except ValueError:
raise ValueError
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment