Skip to content
Snippets Groups Projects
Commit 3bf973c8 authored by leufen1's avatar leufen1
Browse files

filter code has much more comments for understanding, time range of apriori is extended if required

parent 2652975d
Branches
Tags
5 merge requests!319add all changes of dev into release v1.4.0 branch,!318Resolve "release v1.4.0",!317enabled window_lead_time=1,!295Resolve "data handler FIR filter",!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #67029 passed
......@@ -289,12 +289,12 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
:param apriori: Data to use as apriori information. This should be either a xarray dataarray containing monthly or
any other heuristic to support the clim filter, or a list of such arrays containint heuristics for all residua
in addition. The 2nd can be used together with apriori_type `residuum_stat` which estimates the error of the
in addition. The 2nd can be used together with apriori_type `residuum_stats` which estimates the error of the
residuum when the clim filter should be applied with exogenous parameters. If apriori_type is None/`zeros` data
can be provided, but this is not required in this case.
:param apriori_type: set type of information that is provided to the clim filter. For the first low pass always a
calculated or given statistic is used. For residuum prediction a constant value of zero is assumed if
apriori_type is None or `zeros`, and a climatology of the residuum is used for `residuum_stat`.
apriori_type is None or `zeros`, and a climatology of the residuum is used for `residuum_stats`.
"""
_requirements = remove_items(DataHandlerFirFilterSingleStation.requirements(), "station")
......@@ -312,15 +312,15 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
@TimeTrackingWrapper
def apply_filter(self):
"""Apply FIR filter only on inputs."""
apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori
self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori
climate_filter = ClimateFIRFilter(self.input_data, self.fs, self.filter_order, self.filter_cutoff_freq,
self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim,
apriori_type=self.apriori_type, apriori=apriori,
apriori_type=self.apriori_type, apriori=self.apriori,
sel_opts=self.apriori_sel_opts)
self.climate_filter_coeff = climate_filter.filter_coefficients
# store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori
if self.apriori_type == "residuum_stat":
if self.apriori_type == "residuum_stats":
self.apriori = climate_filter.apriori_data
else:
self.apriori = climate_filter.initial_apriori_data
......
import gc
import warnings
from typing import Union
import logging
import datetime
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
......@@ -76,47 +78,137 @@ class ClimateFIRFilter:
apriori_list = to_list(apriori)
input_data = data.__deepcopy__()
for i in range(len(order)):
# calculate climatological filter
fi, hi, apriori = self.clim_filter(input_data, fs, cutoff[i], order[i], apriori=apriori_list[i],
sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, window=window,
var_dim=var_dim)
filtered.append(fi)
h.append(hi)
input_data = input_data - fi # calculate residuum
# calculate residuum
input_data = input_data - fi
# create new apriori information for next iteration if no further apriori is provided
if len(apriori_list) <= i + 1:
if apriori_type is None or apriori_type == "zeros":
apriori_list.append(xr.zeros_like(apriori_list[i])) # zero version
elif apriori_type == "residuum_stats":
if apriori_type is None or apriori_type == "zeros": # zero version
apriori_list.append(xr.zeros_like(apriori_list[i]))
elif apriori_type == "residuum_stats": # calculate monthly statistic on residuum
apriori_list.append(-self.create_monthly_mean(input_data, sel_opts=sel_opts, sampling=sampling,
time_dim=time_dim))
else:
raise ValueError(f"Cannot handle unkown apriori type: {apriori_type}. Please choose from None, "
f"`zeros` or `residuum_stats`.")
# add residuum to filtered
# add last residuum to filtered
filtered.append(input_data)
self._filtered = filtered
self._h = h
self._apriori = apriori_list
@staticmethod
def create_monthly_mean(data, sel_opts=None, sampling="1d", time_dim="datetime"):
monthly = xr.ones_like(data)
def create_unity_array(data, time_dim, extend_range=365):
"""Create a xr data array filled with ones. time_dim is extended by extend_range days in future and past."""
coords = data.coords
# extend time_dim by given extend_range days
start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D")
end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D")
new_time_axis = np.arange(start, end).astype("datetime64[ns]")
# construct data array with updated coords
new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords}
new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims)
# loffset is required because resampling uses last day in month as resampling timestamp
return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max()
def create_monthly_mean(self, data, sel_opts=None, sampling="1d", time_dim="datetime"):
"""Calculate monthly statistics."""
# create unity xarray in monthly resolution with sampling point in mid of each month
monthly = self.create_unity_array(data, time_dim)
# apply selection if given (only use subset for monthly means)
if sel_opts is not None:
data = data.sel(**sel_opts)
# create monthly mean and replace entries in unity array
monthly_mean = data.groupby(f"{time_dim}.month").mean()
for month in monthly_mean.month.values:
loc = (monthly[f"{time_dim}.month"] == month)
monthly.loc[{time_dim: loc}] = monthly_mean.sel(month=month)
return monthly.resample({time_dim: "1m"}).mean().resample({time_dim: sampling}).interpolate()
# aggregate monthly information (shift by half month, because resample base is last day)
return monthly.resample({time_dim: "1m"}).max().resample({time_dim: sampling}).interpolate()
@staticmethod
def extend_apriori(data, apriori, time_dim):
"""
Extend time range of apriori information.
This method will fail, if apriori is available for a shorter period than the gab to fill.
"""
dates = data.coords[time_dim].values
# apriori starts after data
if dates[0] < apriori.coords[time_dim].values[0]:
# add difference in full years
date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]")
extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365
coords = apriori.coords
# create new time axis
start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D")
end = coords[time_dim][0].values.astype("datetime64[D]")
new_time_axis = np.arange(start, end).astype("datetime64[ns]")
# extract old values to use with new axis
start = coords[time_dim][0].values.astype("datetime64[D]")
end = coords[time_dim][0].values.astype("datetime64[D]") + np.timedelta64(extend_range - 1, "D")
new_values = apriori.sel({time_dim: slice(start, end)})
new_values.coords[time_dim] = new_time_axis
# add new values to apriori
apriori = apriori.combine_first(new_values)
# apriori ends before data
if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]:
# add difference in full years + 1 year (because apriori is used as future estimate)
date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]")
extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365
coords = apriori.coords
# create new time axis
start = coords[time_dim][-1].values.astype("datetime64[D]")
end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D")
new_time_axis = np.arange(start, end).astype("datetime64[ns]")
# extract old values to use with new axis
start = coords[time_dim][-1].values.astype("datetime64[D]") - np.timedelta64(extend_range - 1, "D")
end = coords[time_dim][-1].values.astype("datetime64[D]")
new_values = apriori.sel({time_dim: slice(start, end)})
new_values.coords[time_dim] = new_time_axis
# add new values to apriori
apriori = apriori.combine_first(new_values)
return apriori
def clim_filter(self, data, fs, cutoff_high, order, apriori=None, padlen=None, sel_opts=None, sampling="1d",
time_dim="datetime", var_dim="variables", window="hamming"):
# calculate apriori information from data if not given and extend its range if not sufficient long enough
if apriori is None:
apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim)
apriori = self.extend_apriori(data, apriori, time_dim)
# calculate FIR filter coefficients
h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window)
length = len(h)
# start loop on all timestamps
dt = data.coords[time_dim].values
res = xr.zeros_like(data)
print("start iteration")
logging.info("start iteration")
for i in range(0, len(dt)):
t0 = dt[i]
pd_date = pd.to_datetime(t0)
......@@ -135,7 +227,7 @@ class ClimateFIRFilter:
padlen=_padlen, dim=var_dim, window=window, h=h)
res.loc[{time_dim: t0}] = tmp_filter.loc[{time_dim: t0}]
except IndexError:
pass
res.loc[{time_dim: t0}] = np.nan
# if i == 720:
# for var in data.coords[var_dim]:
# data.sel({var_dim: var, time_dim: slice(dt[i_m], dt[i_p+1])}).plot()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment