Skip to content
Snippets Groups Projects
Commit 6f433745 authored by leufen1's avatar leufen1
Browse files

data handler with filters can use kzf coeffs, improved a priori creation

parent 8ea88529
Branches main
No related tags found
5 merge requests!319add all changes of dev into release v1.4.0 branch,!318Resolve "release v1.4.0",!317enabled window_lead_time=1,!295Resolve "data handler FIR filter",!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #72559 failed
...@@ -48,7 +48,7 @@ DEFAULT_CREATE_NEW_BOOTSTRAPS = False ...@@ -48,7 +48,7 @@ DEFAULT_CREATE_NEW_BOOTSTRAPS = False
DEFAULT_NUMBER_OF_BOOTSTRAPS = 20 DEFAULT_NUMBER_OF_BOOTSTRAPS = 20
DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries", DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries",
"PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles", "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles",
"PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram"] "PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram", "PlotPeriodogram"]
DEFAULT_SAMPLING = "daily" DEFAULT_SAMPLING = "daily"
DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA",
"temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "", "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "",
......
...@@ -10,6 +10,7 @@ from mlair.data_handler import DefaultDataHandler ...@@ -10,6 +10,7 @@ from mlair.data_handler import DefaultDataHandler
from mlair import helpers from mlair import helpers
from mlair.helpers import remove_items from mlair.helpers import remove_items
from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD
from mlair.helpers.filter import filter_width_kzf
import inspect import inspect
from typing import Callable from typing import Callable
...@@ -233,6 +234,9 @@ class DataHandlerMixedSamplingWithClimateFirFilterSingleStation(DataHandlerMixed ...@@ -233,6 +234,9 @@ class DataHandlerMixedSamplingWithClimateFirFilterSingleStation(DataHandlerMixed
def estimate_filter_width(self): def estimate_filter_width(self):
"""Filter width is determined by the filter with the highest order.""" """Filter width is determined by the filter with the highest order."""
if isinstance(self.filter_order[0], tuple):
return max([filter_width_kzf(*e) for e in self.filter_order])
else:
return max(self.filter_order) return max(self.filter_order)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
......
...@@ -14,7 +14,7 @@ from mlair.data_handler.data_handler_single_station import DataHandlerSingleStat ...@@ -14,7 +14,7 @@ from mlair.data_handler.data_handler_single_station import DataHandlerSingleStat
from mlair.data_handler import DefaultDataHandler from mlair.data_handler import DefaultDataHandler
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper from mlair.helpers import remove_items, to_list, TimeTrackingWrapper
from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter
from mlair.helpers.filter import FIRFilter, ClimateFIRFilter from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf
# define a more general date type for type hinting # define a more general date type for type hinting
str_or_list = Union[str, List[str]] str_or_list = Union[str, List[str]]
...@@ -152,6 +152,8 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -152,6 +152,8 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
# self._check_sampling(**kwargs) # self._check_sampling(**kwargs)
# self.original_data = None # ToDo: implement here something to store unfiltered data # self.original_data = None # ToDo: implement here something to store unfiltered data
self.fs = self._get_fs(**kwargs) self.fs = self._get_fs(**kwargs)
if filter_window_type == "kzf":
filter_cutoff_period = self._get_kzf_cutoff_period(filter_order, self.fs)
self.filter_cutoff_period, removed_index = self._prepare_filter_cutoff_period(filter_cutoff_period, self.fs) self.filter_cutoff_period, removed_index = self._prepare_filter_cutoff_period(filter_cutoff_period, self.fs)
self.filter_cutoff_freq = self._period_to_freq(self.filter_cutoff_period) self.filter_cutoff_freq = self._period_to_freq(self.filter_cutoff_period)
assert len(self.filter_cutoff_period) == (len(filter_order) - len(removed_index)) assert len(self.filter_cutoff_period) == (len(filter_order) - len(removed_index))
...@@ -165,6 +167,9 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -165,6 +167,9 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
order = [] order = []
for i, o in enumerate(filter_order): for i, o in enumerate(filter_order):
if i not in removed_index: if i not in removed_index:
if isinstance(o, tuple):
fo = (o[0] * fs, o[1])
else:
fo = int(o * fs) fo = int(o * fs)
fo = fo + 1 if fo % 2 == 0 else fo fo = fo + 1 if fo % 2 == 0 else fo
order.append(fo) order.append(fo)
...@@ -185,6 +190,14 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): ...@@ -185,6 +190,14 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation):
removed.append(i) removed.append(i)
return cutoff, removed return cutoff, removed
@staticmethod
def _get_kzf_cutoff_period(kzf_settings, fs):
cutoff = []
for (m, k) in kzf_settings:
w0 = omega_null_kzf(m * fs, k) * fs
cutoff.append(1. / w0)
return cutoff
@staticmethod @staticmethod
def _period_to_freq(cutoff_p): def _period_to_freq(cutoff_p):
return list(map(lambda x: (1. / x[0] if x[0] is not None else None, 1. / x[1] if x[1] is not None else None), return list(map(lambda x: (1. / x[0] if x[0] is not None else None, 1. / x[1] if x[1] is not None else None),
...@@ -325,7 +338,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -325,7 +338,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
0 for all residuum components. 0 for all residuum components.
:param apriori: Data to use as apriori information. This should be either a xarray dataarray containing monthly or :param apriori: Data to use as apriori information. This should be either a xarray dataarray containing monthly or
any other heuristic to support the clim filter, or a list of such arrays containint heuristics for all residua any other heuristic to support the clim filter, or a list of such arrays containing heuristics for all residua
in addition. The 2nd can be used together with apriori_type `residuum_stats` which estimates the error of the in addition. The 2nd can be used together with apriori_type `residuum_stats` which estimates the error of the
residuum when the clim filter should be applied with exogenous parameters. If apriori_type is None/`zeros` data residuum when the clim filter should be applied with exogenous parameters. If apriori_type is None/`zeros` data
can be provided, but this is not required in this case. can be provided, but this is not required in this case.
...@@ -341,7 +354,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -341,7 +354,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
_store_attributes = DataHandlerFirFilterSingleStation.store_attributes() + ["apriori"] _store_attributes = DataHandlerFirFilterSingleStation.store_attributes() + ["apriori"]
def __init__(self, *args, apriori=None, apriori_type=None, apriori_diurnal=False, apriori_sel_opts=None, def __init__(self, *args, apriori=None, apriori_type=None, apriori_diurnal=False, apriori_sel_opts=None,
plot_path=None, **kwargs): plot_path=None, name_affix=None, **kwargs):
self.apriori_type = apriori_type self.apriori_type = apriori_type
self.climate_filter_coeff = None # coefficents of the used FIR filter self.climate_filter_coeff = None # coefficents of the used FIR filter
self.apriori = apriori # exogenous apriori information or None to calculate from data (endogenous) self.apriori = apriori # exogenous apriori information or None to calculate from data (endogenous)
...@@ -349,6 +362,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -349,6 +362,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
self.all_apriori = None # collection of all apriori information self.all_apriori = None # collection of all apriori information
self.apriori_sel_opts = apriori_sel_opts # ensure to separate exogenous and endogenous information self.apriori_sel_opts = apriori_sel_opts # ensure to separate exogenous and endogenous information
self.plot_path = plot_path # use this path to create insight plots self.plot_path = plot_path # use this path to create insight plots
self.plot_name_affix = name_affix
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@TimeTrackingWrapper @TimeTrackingWrapper
...@@ -356,12 +370,13 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -356,12 +370,13 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
"""Apply FIR filter only on inputs.""" """Apply FIR filter only on inputs."""
self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori
logging.info(f"{self.station}: call ClimateFIRFilter") logging.info(f"{self.station}: call ClimateFIRFilter")
plot_name = str(self) # if self.plot_name_affix is None else f"{str(self)}_{self.plot_name_affix}"
climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order,
self.filter_cutoff_freq, self.filter_cutoff_freq,
self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim,
apriori_type=self.apriori_type, apriori=self.apriori, apriori_type=self.apriori_type, apriori=self.apriori,
apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts,
plot_path=self.plot_path, plot_name=str(self), plot_path=self.plot_path, plot_name=plot_name,
minimum_length=self.window_history_size, new_dim=self.window_dim) minimum_length=self.window_history_size, new_dim=self.window_dim)
self.climate_filter_coeff = climate_filter.filter_coefficients self.climate_filter_coeff = climate_filter.filter_coefficients
...@@ -374,11 +389,9 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -374,11 +389,9 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size, 0)}) for c in climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size, 0)}) for c in
climate_filter.filtered_data] climate_filter.filtered_data]
# climate_filter_data = climate_filter.filtered_data
# create input data with filter index # create input data with filter index
input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(), name=self.filter_dim))
# self.input_data = xr.concat([c.sel(window=slice(-self.window_history_size, 0)) for c in climate_filter_data], pd.Index(self.create_filter_index(), name=self.filter_dim))
# add unfiltered raw data # add unfiltered raw data
if self._add_unfiltered is True: if self._add_unfiltered is True:
...@@ -388,7 +401,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -388,7 +401,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
self.input_data = input_data self.input_data = input_data
# self.history = self.shift(data, dim_name_of_shift, window, offset=self.window_history_offset)
# this is just a code snippet to check the results of the filter # this is just a code snippet to check the results of the filter
# import matplotlib # import matplotlib
# matplotlib.use("TkAgg") # matplotlib.use("TkAgg")
...@@ -421,16 +433,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -421,16 +433,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
self.filter_dim_order = lazy_data self.filter_dim_order = lazy_data
DataHandlerSingleStation._extract_lazy(self, (_data, _meta, _input_data, _target_data)) DataHandlerSingleStation._extract_lazy(self, (_data, _meta, _input_data, _target_data))
@staticmethod
def _prepare_filter_order(filter_order, removed_index, fs):
order = []
for i, o in enumerate(filter_order):
if i not in removed_index:
fo = int(o * fs)
fo = fo + 1 if fo % 2 == 0 else fo
order.append(fo)
return order
@staticmethod @staticmethod
def _prepare_filter_cutoff_period(filter_cutoff_period, fs): def _prepare_filter_cutoff_period(filter_cutoff_period, fs):
"""Frequency must be smaller than the sampling frequency fs. Otherwise remove given cutoff period pair.""" """Frequency must be smaller than the sampling frequency fs. Otherwise remove given cutoff period pair."""
......
This diff is collapsed.
...@@ -9,7 +9,7 @@ import numpy as np ...@@ -9,7 +9,7 @@ import numpy as np
import xarray as xr import xarray as xr
import dask.array as da import dask.array as da
from typing import Dict, Callable, Union, List, Any from typing import Dict, Callable, Union, List, Any, Tuple
def to_list(obj: Any) -> List: def to_list(obj: Any) -> List:
...@@ -68,9 +68,9 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce ...@@ -68,9 +68,9 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce
return round_type(number * multiplier) / multiplier return round_type(number * multiplier) / multiplier
def remove_items(obj: Union[List, Dict], items: Any): def remove_items(obj: Union[List, Dict, Tuple], items: Any):
""" """
Remove item(s) from either list or dictionary. Remove item(s) from either list, tuple or dictionary.
:param obj: object to remove items from (either dictionary or list) :param obj: object to remove items from (either dictionary or list)
:param items: elements to remove from obj. Can either be a list or single entry / key :param items: elements to remove from obj. Can either be a list or single entry / key
...@@ -99,6 +99,8 @@ def remove_items(obj: Union[List, Dict], items: Any): ...@@ -99,6 +99,8 @@ def remove_items(obj: Union[List, Dict], items: Any):
return remove_from_list(obj, items) return remove_from_list(obj, items)
elif isinstance(obj, dict): elif isinstance(obj, dict):
return remove_from_dict(obj, items) return remove_from_dict(obj, items)
elif isinstance(obj, tuple):
return tuple(remove_from_list(to_list(obj), items))
else: else:
raise TypeError(f"{inspect.stack()[0][3]} does not support type {type(obj)}.") raise TypeError(f"{inspect.stack()[0][3]} does not support type {type(obj)}.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment