diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index bfbef52180b5c8156acae0b7005e8ad984ae1cba..088a504ab623198f0cc37815717a7ce291dda461 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -48,7 +48,7 @@ DEFAULT_CREATE_NEW_BOOTSTRAPS = False DEFAULT_NUMBER_OF_BOOTSTRAPS = 20 DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles", - "PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram"] + "PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram", "PlotPeriodogram"] DEFAULT_SAMPLING = "daily" DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "", diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 62a354a20c16092229326ffac61fe29e972785b7..8205ae6c28f3683b1052c292e5d063d8bca555dc 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -10,6 +10,7 @@ from mlair.data_handler import DefaultDataHandler from mlair import helpers from mlair.helpers import remove_items from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD +from mlair.helpers.filter import filter_width_kzf import inspect from typing import Callable @@ -233,7 +234,10 @@ class DataHandlerMixedSamplingWithClimateFirFilterSingleStation(DataHandlerMixed def estimate_filter_width(self): """Filter width is determined by the filter with the highest order.""" - return max(self.filter_order) + if isinstance(self.filter_order[0], tuple): + return max([filter_width_kzf(*e) for e in self.filter_order]) + else: + return max(self.filter_order) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 5da1b8930642954549b7aa78f63f80073a4d843a..80253b0a5330926fb123bf032772f5d063267213 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -14,7 +14,7 @@ from mlair.data_handler.data_handler_single_station import DataHandlerSingleStat from mlair.data_handler import DefaultDataHandler from mlair.helpers import remove_items, to_list, TimeTrackingWrapper from mlair.helpers.filter import KolmogorovZurbenkoFilterMovingWindow as KZFilter -from mlair.helpers.filter import FIRFilter, ClimateFIRFilter +from mlair.helpers.filter import FIRFilter, ClimateFIRFilter, omega_null_kzf # define a more general date type for type hinting str_or_list = Union[str, List[str]] @@ -152,6 +152,8 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): # self._check_sampling(**kwargs) # self.original_data = None # ToDo: implement here something to store unfiltered data self.fs = self._get_fs(**kwargs) + if filter_window_type == "kzf": + filter_cutoff_period = self._get_kzf_cutoff_period(filter_order, self.fs) self.filter_cutoff_period, removed_index = self._prepare_filter_cutoff_period(filter_cutoff_period, self.fs) self.filter_cutoff_freq = self._period_to_freq(self.filter_cutoff_period) assert len(self.filter_cutoff_period) == (len(filter_order) - len(removed_index)) @@ -165,8 +167,11 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): order = [] for i, o in enumerate(filter_order): if i not in removed_index: - fo = int(o * fs) - fo = fo + 1 if fo % 2 == 0 else fo + if isinstance(o, tuple): + fo = (o[0] * fs, o[1]) + else: + fo = int(o * fs) + fo = fo + 1 if fo % 2 == 0 else fo order.append(fo) return order @@ -185,6 +190,14 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): removed.append(i) return cutoff, removed + @staticmethod + def _get_kzf_cutoff_period(kzf_settings, fs): + cutoff = [] + for (m, k) in kzf_settings: + w0 = omega_null_kzf(m * fs, k) * fs + cutoff.append(1. / w0) + return cutoff + @staticmethod def _period_to_freq(cutoff_p): return list(map(lambda x: (1. / x[0] if x[0] is not None else None, 1. / x[1] if x[1] is not None else None), @@ -325,7 +338,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation 0 for all residuum components. :param apriori: Data to use as apriori information. This should be either a xarray dataarray containing monthly or - any other heuristic to support the clim filter, or a list of such arrays containint heuristics for all residua + any other heuristic to support the clim filter, or a list of such arrays containing heuristics for all residua in addition. The 2nd can be used together with apriori_type `residuum_stats` which estimates the error of the residuum when the clim filter should be applied with exogenous parameters. If apriori_type is None/`zeros` data can be provided, but this is not required in this case. @@ -341,7 +354,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation _store_attributes = DataHandlerFirFilterSingleStation.store_attributes() + ["apriori"] def __init__(self, *args, apriori=None, apriori_type=None, apriori_diurnal=False, apriori_sel_opts=None, - plot_path=None, **kwargs): + plot_path=None, name_affix=None, **kwargs): self.apriori_type = apriori_type self.climate_filter_coeff = None # coefficents of the used FIR filter self.apriori = apriori # exogenous apriori information or None to calculate from data (endogenous) @@ -349,6 +362,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.all_apriori = None # collection of all apriori information self.apriori_sel_opts = apriori_sel_opts # ensure to separate exogenous and endogenous information self.plot_path = plot_path # use this path to create insight plots + self.plot_name_affix = name_affix super().__init__(*args, **kwargs) @TimeTrackingWrapper @@ -356,12 +370,13 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation """Apply FIR filter only on inputs.""" self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori logging.info(f"{self.station}: call ClimateFIRFilter") + plot_name = str(self) # if self.plot_name_affix is None else f"{str(self)}_{self.plot_name_affix}" climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, apriori_type=self.apriori_type, apriori=self.apriori, apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, - plot_path=self.plot_path, plot_name=str(self), + plot_path=self.plot_path, plot_name=plot_name, minimum_length=self.window_history_size, new_dim=self.window_dim) self.climate_filter_coeff = climate_filter.filter_coefficients @@ -374,11 +389,9 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size, 0)}) for c in climate_filter.filtered_data] - # climate_filter_data = climate_filter.filtered_data # create input data with filter index input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) - # self.input_data = xr.concat([c.sel(window=slice(-self.window_history_size, 0)) for c in climate_filter_data], pd.Index(self.create_filter_index(), name=self.filter_dim)) # add unfiltered raw data if self._add_unfiltered is True: @@ -388,7 +401,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.input_data = input_data - # self.history = self.shift(data, dim_name_of_shift, window, offset=self.window_history_offset) # this is just a code snippet to check the results of the filter # import matplotlib # matplotlib.use("TkAgg") @@ -421,16 +433,6 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.filter_dim_order = lazy_data DataHandlerSingleStation._extract_lazy(self, (_data, _meta, _input_data, _target_data)) - @staticmethod - def _prepare_filter_order(filter_order, removed_index, fs): - order = [] - for i, o in enumerate(filter_order): - if i not in removed_index: - fo = int(o * fs) - fo = fo + 1 if fo % 2 == 0 else fo - order.append(fo) - return order - @staticmethod def _prepare_filter_cutoff_period(filter_cutoff_period, fs): """Frequency must be smaller than the sampling frequency fs. Otherwise remove given cutoff period pair.""" diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 9662122b8d2eb9835d83883290cedfbc1e639e35..a551bec4220b01012c6d6962e6dfee135fbe5d6a 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -1,6 +1,6 @@ import gc import warnings -from typing import Union, Callable +from typing import Union, Callable, Tuple import logging import os import time @@ -55,10 +55,11 @@ class FIRFilter: class ClimateFIRFilter: + from mlair.plotting.data_insight_plotting import PlotClimateFirFilter def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, - apriori_diurnal=False, sel_opts=None, plot_path=None, plot_name=None, vectorized=True, - padlen_factor=0.8, minimum_length=None, new_dim=None): + apriori_diurnal=False, sel_opts=None, plot_path=None, plot_name=None, + minimum_length=None, new_dim=None): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -81,9 +82,10 @@ class ClimateFIRFilter: self.plot_data = [] filtered = [] h = [] - sel_opts = sel_opts if isinstance(sel_opts, dict) else {time_dim: sel_opts} + if sel_opts is not None: + sel_opts = sel_opts if isinstance(sel_opts, dict) else {time_dim: sel_opts} sampling = {1: "1d", 24: "1H"}.get(int(fs)) - logging.info(f"{plot_name}: create diurnal_anomalies") + logging.debug(f"{plot_name}: create diurnal_anomalies") if apriori_diurnal is True and sampling == "1H": # diurnal_anomalies = self.create_hourly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, # as_anomaly=True) @@ -92,11 +94,11 @@ class ClimateFIRFilter: as_anomaly=True) else: diurnal_anomalies = 0 - logging.info(f"{plot_name}: create monthly apriori") + logging.debug(f"{plot_name}: create monthly apriori") if apriori is None: apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) + diurnal_anomalies - logging.info(f"{plot_name}: apriori shape = {apriori.shape}") + logging.debug(f"{plot_name}: apriori shape = {apriori.shape}") apriori_list = to_list(apriori) input_data = data.__deepcopy__() @@ -109,17 +111,14 @@ class ClimateFIRFilter: for i in range(len(order)): logging.info(f"{plot_name}: start filter for order {order[i]}") # calculate climatological filter - # clim_filter: Callable = {True: self.clim_filter_vectorized, False: self.clim_filter}[vectorized] # ToDo: remove all methods except the vectorized version - clim_filter: Callable = {True: self.clim_filter_vectorized_less_memory, False: self.clim_filter}[vectorized] - _minimum_length = self._minimum_length(order, minimum_length, i) - fi, hi, apriori, plot_data = clim_filter(input_data, fs, cutoff[i], order[i], - apriori=apriori_list[i], - sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, - window=window, - var_dim=var_dim, plot_index=i, padlen_factor=padlen_factor, - minimum_length=_minimum_length, new_dim=new_dim, - plot_dates=plot_dates) + _minimum_length = self._minimum_length(order, minimum_length, i, window) + fi, hi, apriori, plot_data = self.clim_filter(input_data, fs, cutoff[i], order[i], + apriori=apriori_list[i], + sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, + window=window, var_dim=var_dim, + minimum_length=_minimum_length, new_dim=new_dim, + plot_dates=plot_dates) logging.info(f"{plot_name}: finished clim_filter calculation") if minimum_length is None: @@ -173,13 +172,17 @@ class ClimateFIRFilter: self._apriori = apriori_list # visualize - self._plot(sampling) + if self.plot_path is not None: + self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, plot_name) + # self._plot(sampling, new_dim=new_dim) @staticmethod - def _minimum_length(order, minimum_length, pos): + def _minimum_length(order, minimum_length, pos, window): next_order = 0 if pos + 1 < len(order): next_order = order[pos + 1] + if window == "kzf" and isinstance(next_order, tuple): + next_order = filter_width_kzf(*next_order) if minimum_length is not None: next_order = next_order + minimum_length return next_order if next_order > 0 else None @@ -290,283 +293,61 @@ class ClimateFIRFilter: """ Extend time range of apriori information. - This method will fail, if apriori is available for a shorter period than the gab to fill. + This method may not working properly if length of apriori is less then one year. """ dates = data.coords[time_dim].values td_type = {"1d": "D", "1H": "h"}.get(sampling) # apriori starts after data if dates[0] < apriori.coords[time_dim].values[0]: - logging.info(f"{data.coords['Stations'].values[0]}: apriori starts after data") + logging.debug(f"{data.coords['Stations'].values[0]}: apriori starts after data") + # add difference in full years date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]") extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 - coords = apriori.coords + factor = 1 if td_type == "D" else 24 - # create new time axis - # start = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) - np.timedelta64(extend_range, "D") - # end = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) - # new_time_axis = np.arange(start, end).astype("datetime64[ns]") + # get fill data range + start = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + end = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64( + 366 * factor + 1, td_type) - factor = 1 if td_type == "D" else 24 - start = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) - np.timedelta64( - extend_range * factor + 1, - td_type) - end = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) - new_time_axis = np.arange(start, end).astype("datetime64[ns]") - logging.info(f"{data.coords['Stations'].values[0]}: shape of new_time_axis = {new_time_axis.shape}") - - # extract old values to use with new axis - # start = coords[time_dim][0].values.astype("datetime64[D]") - # end = coords[time_dim][0].values.astype("datetime64[D]") + np.timedelta64(extend_range - 1, "D") - # new_values = apriori.sel({time_dim: slice(start, end)}) - # new_values.coords[time_dim] = new_time_axis - - start = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) - end = coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64( - extend_range * factor - 1, td_type) - new_values = apriori.sel({time_dim: slice(start, end)}) - logging.info(f"{data.coords['Stations'].values[0]}: shape of new_values = {new_values.shape}") - new_values.coords[time_dim] = new_time_axis - - # add new values to apriori - apriori = apriori.combine_first(new_values) + # fill year by year + for i in range(365, extend_range + 365, 365): + apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date + new_time_axis = apriori_tmp.coords[time_dim] - np.timedelta64(i * factor, td_type) + apriori_tmp.coords[time_dim] = new_time_axis + apriori = apriori.combine_first(apriori_tmp) # apriori ends before data if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]: - logging.info(f"{data.coords['Stations'].values[0]}: apriori ends before data") + logging.debug(f"{data.coords['Stations'].values[0]}: apriori ends before data") + # add difference in full years + 1 year (because apriori is used as future estimate) date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]") extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365 - coords = apriori.coords - - # create new time axis factor = 1 if td_type == "D" else 24 - start = coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - end = coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) + np.timedelta64( - extend_range * factor + 1, - td_type) - new_time_axis = np.arange(start, end).astype("datetime64[ns]") # hint: arange does not include end date - logging.info(f"{data.coords['Stations'].values[0]}: shape of new_time_axis = {new_time_axis.shape}") - logging.info(f"{data.coords['Stations'].values[0]}: start of new_time_axis = {start}") - logging.info(f"{data.coords['Stations'].values[0]}: end of new_time_axis = {end}") - logging.info(f"{data.coords['Stations'].values[0]}: delta of new_time_axis = {end - start}") - - # extract old values to use with new axis - start = coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64( - extend_range * factor, td_type) - # start = coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64( - # extend_range * factor, td_type) - end = coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - new_values = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date - logging.info(f"{data.coords['Stations'].values[0]}: shape of new_values = {new_values.shape}") - logging.info(f"{data.coords['Stations'].values[0]}: start of new_values = {start}") - logging.info(f"{data.coords['Stations'].values[0]}: end of new_values = {end}") - logging.info(f"{data.coords['Stations'].values[0]}: delta of new_values = {end - start}") - - logging.info(f"{data.coords['Stations'].values[0]}: set new_time_axis") - new_values.coords[time_dim] = new_time_axis - - # add new values to apriori - logging.info(f"{data.coords['Stations'].values[0]}: add to apriori") - apriori = apriori.combine_first(new_values) - return apriori + # get fill data range + start = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64( + 366 * factor + 1, td_type) + end = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - @TimeTrackingWrapper - def clim_filter(self, data, fs, cutoff_high, order, apriori=None, padlen_factor=0.5, sel_opts=None, sampling="1d", - time_dim="datetime", var_dim="variables", window="hamming", plot_index=None): - - # calculate apriori information from data if not given and extend its range if not sufficient long enough - if apriori is None: - apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) - apriori = self.extend_apriori(data, apriori, time_dim, sampling) - - # calculate FIR filter coefficients - h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) - length = len(h) + # fill year by year + for i in range(365, extend_range + 365, 365): + apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date + new_time_axis = apriori_tmp.coords[time_dim] + np.timedelta64(i * factor, td_type) + apriori_tmp.coords[time_dim] = new_time_axis + apriori = apriori.combine_first(apriori_tmp) - # start loop on all timestamps - dt = data.coords[time_dim].values - res = xr.zeros_like(data) - logging.info("start iteration") - for i in range(0, len(dt)): - t0 = dt[i] - pd_date = pd.to_datetime(t0) - if pd_date.day == 1 and pd_date.month == 1: - print(t0) - try: - i_m = max(0, i - length) - i_p = min(i + length, len(dt) - 2) - t_hist = slice(dt[i_m], dt[i]) - t_fut = slice(dt[i + 1], dt[i_p + 1]) - tmp_hist = data.sel({time_dim: t_hist}) - tmp_fut = apriori.sel({time_dim: t_fut}) - tmp_comb = xr.concat([tmp_hist, tmp_fut], dim=time_dim) - _padlen = int(min(padlen_factor, 1) * len(tmp_comb.coords[time_dim])) - tmp_filter, _ = fir_filter(tmp_comb, fs, cutoff_high=cutoff_high, order=order, causal=False, - padlen=_padlen, dim=var_dim, window=window, h=h) - res.loc[{time_dim: t0}] = tmp_filter.loc[{time_dim: t0}] - if i == 720 and self.plot_path is not None: - self.plot(data, tmp_comb, var_dim, time_dim, slice(dt[i_m], dt[i_p + 1]), t0, plot_index) - except IndexError: - res.loc[{time_dim: t0}] = np.nan - return res, h, apriori - - @TimeTrackingWrapper - def clim_filter_vectorized(self, data, fs, cutoff_high, order, apriori=None, padlen_factor=0.5, sel_opts=None, - sampling="1d", time_dim="datetime", var_dim="variables", window="hamming", - plot_index=None): - - # calculate apriori information from data if not given and extend its range if not sufficient long enough - if apriori is None: - apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) - apriori = self.extend_apriori(data, apriori, time_dim, sampling) - - # calculate FIR filter coefficients - h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) - length = len(h) - - # create tmp dimension to apply filter, search for unused name - new_dim = self._create_tmp_dimension(data) - - # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] - history = self._shift_data(data, range(int(-(length - 1) / 2), 1), time_dim, var_dim, new_dim) - future = self._shift_data(apriori, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - # filter_input_data = history.combine_first(future) - # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() - # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() - - time_axis = filter_input_data.coords[time_dim] - # apply vectorized fir filter along the tmp dimension - kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, - "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} - # with TimeTracking(name="numpy_vec"): - # filt = fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs) - # with TimeTracking(name="xr_apply_ufunc"): - # filt = xr.apply_ufunc(fir_filter_vectorized, filter_input_data, time_axis, - # input_core_dims=[[new_dim], []], output_core_dims=[[new_dim]], vectorize=True, - # kwargs=kwargs) - with TimeTracking(name="convolve"): - slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data.sel({new_dim: slicer}), - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - # plot - if self.plot_path is not None: - for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days - try: - pos = int(time_pos * 365 * fs) - filter_example = filter_input_data.isel({time_dim: pos}) - t0 = filter_example.coords[time_dim].values - t_slice = filter_input_data.isel( - {time_dim: slice(pos - int((length - 1) / 2), pos + int((length - 1) / 2) + 1)}).coords[ - time_dim].values - self.plot(data, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") - except IndexError: - pass - - # select only values at tmp dimension 0 at each point in time - res = filt.sel({new_dim: 0}, drop=True) - # create result array with same shape like input data, gabs are filled by nans - res_full = xr.ones_like(data) * np.nan - res_full.loc[res.coords] = res - return res_full, h, apriori - - def _tmp_analysis(self, data, apriori, var, var_dim, length, time_dim, new_dim, h): - logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") - d = data.sel({var_dim: [var]}).sel(datetime=slice("2007", "2010")) - a = apriori.sel({var_dim: [var]}).sel(datetime=slice("2007", "2010")) - - # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] - history = self._shift_data(d, range(-length, 1), time_dim, var_dim, new_dim) - - future = self._shift_data(d, range(1, length), time_dim, var_dim, new_dim) - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - filt_nc = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - filt_t0 = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) - future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - diff - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - filt_diff1d = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - diff = (a - history.sel(window=slice(-24 * 7, 1)).mean(new_dim)) - future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - diff - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - filt_diff1w = xr.apply_ufunc(fir_filter_convolve_vectorized, - filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - diff = (a - history.sel(window=slice(-24 * 7, 1)).mean(new_dim)) - future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - diff = xr.zeros_like(future) + diff - lam = np.log(2) / (7 * 24) - diff = diff * np.exp(- lam * diff.coords["window"]) - future = future - diff - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - filt_diff1w_decay = xr.apply_ufunc(fir_filter_convolve_vectorized, - filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) - - t0 = datetime.datetime.strptime("2009-07-15 00:00", "%Y-%m-%d %H:%M") - delta = datetime.timedelta(hours=1) - for i in range(int((length - 1) / 2)): - plt.plot(-i, filt_nc.sel(datetime=t0 - i * delta, window=0), marker="+", color="black") - filt_nc.sel(datetime=t0).plot(label="noncausal") - filt_t0.sel(datetime=t0).plot(label="nodiff") - filt_diff1d.sel(datetime=t0).plot(label="diff1d") - filt_diff1w.sel(datetime=t0).plot(label="diff1w") - filt_diff1w_decay.sel(datetime=t0).plot(label="diff1wdecay") - plt.legend() - - for i in range(int((length - 1) / 2)): - plt.plot(-i, filt_t0.sel(datetime=t0 - i * delta, window=0), marker="+", color="black") - - z = 1 + return apriori @TimeTrackingWrapper - def clim_filter_vectorized_less_memory(self, data, fs, cutoff_high, order, apriori=None, padlen_factor=0.5, - sel_opts=None, - sampling="1d", time_dim="datetime", var_dim="variables", window="hamming", - plot_index=None, minimum_length=None, new_dim="window", plot_dates=None): + def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, + sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", + minimum_length=None, new_dim="window", plot_dates=None): - logging.info(f"{data.coords['Stations'].values[0]}: extend apriori") + logging.debug(f"{data.coords['Stations'].values[0]}: extend apriori") # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: @@ -575,12 +356,12 @@ class ClimateFIRFilter: apriori = self.extend_apriori(data, apriori, time_dim, sampling) # calculate FIR filter coefficients - h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) + if window == "kzf": + h = firwin_kzf(*order) + else: + h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) length = len(h) - # create tmp dimension to apply filter, search for unused name - # new_dim = self._create_tmp_dimension(data) - # use filter length if no minimum is given, otherwise use minimum + half filter length for extension extend_length_history = length if minimum_length is None else minimum_length + int((length + 1) / 2) extend_length_future = int((length + 1) / 2) + 1 @@ -595,7 +376,6 @@ class ClimateFIRFilter: coll = [] for var in reversed(data.coords[var_dim].values): - # self._tmp_analysis(data, apriori, var, var_dim, length, time_dim, new_dim, h) logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") _start = pd.to_datetime(data.coords[time_dim].min().values).year @@ -611,20 +391,14 @@ class ClimateFIRFilter: continue # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] - # logging.info(f"{data.coords['Stations'].values[0]} ({var}): history") if new_dim not in d.coords: history = self._shift_data(d, range(int(-extend_length_history), 1), time_dim, var_dim, new_dim) else: history = d.sel({new_dim: slice(int(-extend_length_history), 0)}) - # logging.info(f"{data.coords['Stations'].values[0]} ({var}): future") - # diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) if new_dim not in a.coords: future = self._shift_data(a, range(1, extend_length_future), time_dim, var_dim, new_dim) - # future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - diff else: future = a.sel({new_dim: slice(1, extend_length_future)}) - # logging.info(f"{data.coords['Stations'].values[0]} ({var}): concat to filter input") - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") try: filter_input_data = filter_input_data.sel({time_dim: str(_year)}) @@ -633,10 +407,9 @@ class ClimateFIRFilter: if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year continue - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - # slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, # .sel({new_dim: slicer}), + logging.debug(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name=f"{data.coords['Stations'].values[0]} ({var}): filter convolve"): + filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data, input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], vectorize=True, @@ -681,31 +454,10 @@ class ClimateFIRFilter: coll.append(xr.concat(filt_coll, time_dim)) gc.collect() - # plot - # ToDo: enable plotting again - # if self.plot_path is not None: - # for i, viz_data in enumerate(plot_data): - # self.plot_new(viz_data, data.sel({var_dim: [var]}), var_dim, time_dim, new_dim, f"{plot_index}_{i}", - # sampling) - - # for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days - # try: - # - # plot_data = coll[-1] - # pos = int(time_pos * 365 * fs) - # filter_example = plot_data.isel({time_dim: pos}) - # t0 = filter_example.coords[time_dim].values - # - # slice_tmp = slice(pos - abs(plot_data.coords[new_dim].values.min()), pos + abs(plot_data.coords[new_dim].values.min())) - # t_slice = plot_data.isel({time_dim: slice_tmp}).coords[time_dim].values - # self.plot(data.sel({var_dim: [var]}), filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") - # except IndexError: - # pass - - logging.info(f"{data.coords['Stations'].values[0]}: concat all variables") + logging.debug(f"{data.coords['Stations'].values[0]}: concat all variables") res = xr.concat(coll, var_dim) # create result array with same shape like input data, gabs are filled by nans - logging.info(f"{data.coords['Stations'].values[0]}: create res_full") + logging.debug(f"{data.coords['Stations'].values[0]}: create res_full") new_coords = {**{k: data.coords[k].values for k in data.coords if k != new_dim}, new_dim: res.coords[new_dim]} dims = [*data.dims, new_dim] if new_dim not in data.dims else data.dims @@ -751,8 +503,7 @@ class ClimateFIRFilter: res.name = index_name return res - def _plot(self, sampling): - new_dim = "window" + def _plot(self, sampling, new_dim="window"): h = None td_type = {"1d": "D", "1H": "h"}.get(sampling) if self.plot_path is None: @@ -761,6 +512,7 @@ class ClimateFIRFilter: if not os.path.exists(plot_folder): os.makedirs(plot_folder) + # set plot parameter rc_params = {'axes.labelsize': 'large', 'xtick.labelsize': 'large', 'ytick.labelsize': 'large', @@ -828,7 +580,7 @@ class ClimateFIRFilter: label="estimated future") # clim filter response - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input, + filt = xr.apply_ufunc(fir_filter_convolve, filter_input, input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], vectorize=True, @@ -839,7 +591,7 @@ class ClimateFIRFilter: residuum_estimated = filter_input - filt # ideal filter response - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_nc, + filt = xr.apply_ufunc(fir_filter_convolve, filter_input_nc, input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], vectorize=True, @@ -882,93 +634,6 @@ class ClimateFIRFilter: plt.savefig(plot_name, dpi=300) plt.close('all') - def plot_new(self, viz_data, orig_data, var_dim, time_dim, new_dim, plot_index, sampling): - try: - td_type = {"1d": "D", "1H": "h"}.get(sampling) - filter_example = viz_data["filt"] - filter_input = viz_data["filter_input"] - filter_nc = viz_data["filt_nc"] - valid_range = viz_data["valid_range"] - t0 = viz_data["t0"] - t_minus = t0 + np.timedelta64(filter_input.coords[new_dim].values.min(), td_type) - t_plus = t0 + np.timedelta64(filter_input.coords[new_dim].values.max(), td_type) - t_slice = slice(t_minus, t_plus) - data = orig_data.sel({time_dim: t_slice}) - plot_folder = os.path.join(os.path.abspath(self.plot_path), "climFIR") - if not os.path.exists(plot_folder): - os.makedirs(plot_folder) - - for var in data.coords[var_dim]: - time_axis = data.sel({var_dim: var, time_dim: t_slice}).coords[time_dim].values - rc_params = {'axes.labelsize': 'large', - 'xtick.labelsize': 'large', - 'ytick.labelsize': 'large', - 'legend.fontsize': 'large', - 'axes.titlesize': 'large', - } - plt.rcParams.update(rc_params) - fig, ax = plt.subplots() - - ax.axvspan(t0 + np.timedelta64(-valid_range.start, td_type), - t0 + np.timedelta64(valid_range.stop - 1, td_type), color="whitesmoke", label="valid area") - - ax.axvline(t0, color="lightgrey", lw=6, label="time of interest ($t_0$)") - ax.plot(time_axis, data.sel({var_dim: var, time_dim: t_slice}).values.flatten(), - color="darkgrey", linestyle="dashed", label="original") - d_tmp = filter_input.sel( - {var_dim: var, new_dim: slice(0, filter_input.coords[new_dim].values.max())}).values.flatten() - # ax.plot(time_axis[len(time_axis) - len(d_tmp):], d_tmp, color="darkgrey", linestyle=(0 ,(1, 1)), label="filter input") - ax.plot(time_axis[len(time_axis) - len(d_tmp):], d_tmp, color="darkgrey", linestyle="solid", - label="estimated future") - # data.sel({var_dim: var, time_dim: time_dim_slice}).plot() - # tmp_comb.sel({var_dim: var}).plot() - # d_filt = filter_example.sel({var_dim: var}).values.flatten() - ax.plot(time_axis, filter_example.sel({var_dim: var}).values.flatten(), - color="black", linestyle="solid", label="filter response", linewidth=2) - ax.plot(time_axis, filter_nc.sel({var_dim: var}).values.flatten(), - color="black", linestyle="dashed", label="ideal filter response", linewidth=2) - plt.title(f"Input of ClimFilter ({str(var.values)})") - plt.legend() - fig.autofmt_xdate() - plt.tight_layout() - plot_name = os.path.join(plot_folder, f"climFIR_{self.plot_name}_{str(var.values)}_{plot_index}.pdf") - plt.savefig(plot_name, dpi=300) - plt.close('all') - except: - pass - - def plot(self, data, tmp_comb, var_dim, time_dim, time_dim_slice, t0, plot_index): - try: - plot_folder = os.path.join(os.path.abspath(self.plot_path), "climFIR") - if not os.path.exists(plot_folder): - os.makedirs(plot_folder) - for var in data.coords[var_dim]: - time_axis = data.sel({var_dim: var, time_dim: time_dim_slice}).coords[time_dim].values - rc_params = {'axes.labelsize': 'large', - 'xtick.labelsize': 'large', - 'ytick.labelsize': 'large', - 'legend.fontsize': 'large', - 'axes.titlesize': 'large', - } - plt.rcParams.update(rc_params) - fig, ax = plt.subplots() - ax.axvline(t0, color="lightgrey", lw=6, label="time of interest ($t_0$)") - ax.plot(time_axis, data.sel({var_dim: var, time_dim: time_dim_slice}).values.flatten(), - color="darkgrey", linestyle="--", label="original") - d_filt = tmp_comb.sel({var_dim: var}).values.flatten() - ax.plot(time_axis[:len(d_filt)], d_filt, color="black", label="filter input") - # data.sel({var_dim: var, time_dim: time_dim_slice}).plot() - # tmp_comb.sel({var_dim: var}).plot() - plt.title(f"Input of ClimFilter ({str(var.values)})") - plt.legend() - fig.autofmt_xdate() - plt.tight_layout() - plot_name = os.path.join(plot_folder, f"climFIR_{self.plot_name}_{str(var.values)}_{plot_index}.pdf") - plt.savefig(plot_name, dpi=300) - plt.close('all') - except: - pass - @property def filter_coefficients(self): return self._h @@ -1016,69 +681,10 @@ def fir_filter(data, fs, order=5, cutoff_low=None, cutoff_high=None, window="ham return filtered, h -def fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs): - filt_np = xr.DataArray(np.nan, coords=filter_input_data.coords) - for var in filter_input_data.coords[var_dim]: - logging.info( - f"{filter_input_data.coords['Stations'].values[0]}: {str(var.values)}") # ToDo must be removed, just for debug - a = np.apply_along_axis(fir_filter_vectorized, filter_input_data.dims.index(new_dim), - filter_input_data.sel({var_dim: var}).values, **kwargs) - filt_np.loc[{var_dim: var}] = a - return filt_np - - -def fir_filter_convolve_vectorized(data, h): +def fir_filter_convolve(data, h): return signal.convolve(data, h, mode='same', method="direct") / sum(h) -def fir_filter_vectorized(data, time_stamp=None, fs=1, order=5, cutoff_low=None, cutoff_high=None, window="hamming", - h=None, - causal=True, - padlen=None): - """Expects numpy array.""" - if time_stamp is not None: - pd_date = pd.to_datetime(time_stamp) - if pd_date.day == 1 and pd_date.month in [1, 7]: - logging.info(time_stamp) - # sel = ~np.isnan(data) - # res = np.empty_like(data) - if h is None: - cutoff = [] - if cutoff_low is not None: - cutoff += [cutoff_low] - if cutoff_high is not None: - cutoff += [cutoff_high] - if len(cutoff) == 2: - filter_type = "bandpass" - elif len(cutoff) == 1 and cutoff_low is not None: - filter_type = "highpass" - elif len(cutoff) == 1 and cutoff_high is not None: - filter_type = "lowpass" - else: - raise ValueError("Please provide either cutoff_low or cutoff_high.") - h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window) - if causal: - # y = signal.lfilter(h, 1., data[sel]) - y = signal.lfilter(h, 1., data) - else: - padlen = padlen if padlen is not None else 3 * len(h) - # print(sum(sel)) - # if sum(sel) <= padlen: - # y = np.empty_like(data[sel]) - # else: - # y = signal.filtfilt(h, 1., data[sel], padlen=padlen) - y = signal.filtfilt(h, 1., data, padlen=padlen) - # res[sel] = y - # return res - return y - - -def fir_filter_vectorized_short(data, h=None, padlen=None): - """Expects numpy array.""" - y = signal.filtfilt(h, 1., data, padlen=padlen) - return y - - class KolmogorovZurbenkoBaseClass: def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): @@ -1287,3 +893,25 @@ class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): return df_itr except ValueError: raise ValueError + + +def firwin_kzf(m, k): + coef = np.ones(m) + for i in range(1, k): + t = np.zeros((m, m + i * (m - 1))) + for km in range(m): + t[km, km:km + coef.size] = coef + coef = np.sum(t, axis=0) + return coef / m ** k + + +def omega_null_kzf(m, k, alpha=0.5): + a = np.sqrt(6) / np.pi + b = 1 / (2 * np.array(k)) + c = 1 - alpha ** b + d = np.array(m) ** 2 - alpha ** b + return a * np.sqrt(c / d) + + +def filter_width_kzf(m, k): + return k * (m - 1) + 1 diff --git a/mlair/helpers/helpers.py b/mlair/helpers/helpers.py index 7eab911122cb4dc030887bed476a4e253553332e..499f8258a168851e74ca3eab0d0af5a11cb13bec 100644 --- a/mlair/helpers/helpers.py +++ b/mlair/helpers/helpers.py @@ -9,7 +9,7 @@ import numpy as np import xarray as xr import dask.array as da -from typing import Dict, Callable, Union, List, Any +from typing import Dict, Callable, Union, List, Any, Tuple def to_list(obj: Any) -> List: @@ -68,9 +68,9 @@ def float_round(number: float, decimals: int = 0, round_type: Callable = math.ce return round_type(number * multiplier) / multiplier -def remove_items(obj: Union[List, Dict], items: Any): +def remove_items(obj: Union[List, Dict, Tuple], items: Any): """ - Remove item(s) from either list or dictionary. + Remove item(s) from either list, tuple or dictionary. :param obj: object to remove items from (either dictionary or list) :param items: elements to remove from obj. Can either be a list or single entry / key @@ -99,6 +99,8 @@ def remove_items(obj: Union[List, Dict], items: Any): return remove_from_list(obj, items) elif isinstance(obj, dict): return remove_from_dict(obj, items) + elif isinstance(obj, tuple): + return tuple(remove_from_list(to_list(obj), items)) else: raise TypeError(f"{inspect.stack()[0][3]} does not support type {type(obj)}.")