diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 0619c74abc6b59e471a318406dec094486cf0966..67902cd0eae0c6698021baef4ea066ff850ba01a 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -289,12 +289,12 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation :param apriori: Data to use as apriori information. This should be either a xarray dataarray containing monthly or any other heuristic to support the clim filter, or a list of such arrays containint heuristics for all residua - in addition. The 2nd can be used together with apriori_type `residuum_stat` which estimates the error of the + in addition. The 2nd can be used together with apriori_type `residuum_stats` which estimates the error of the residuum when the clim filter should be applied with exogenous parameters. If apriori_type is None/`zeros` data can be provided, but this is not required in this case. :param apriori_type: set type of information that is provided to the clim filter. For the first low pass always a calculated or given statistic is used. For residuum prediction a constant value of zero is assumed if - apriori_type is None or `zeros`, and a climatology of the residuum is used for `residuum_stat`. + apriori_type is None or `zeros`, and a climatology of the residuum is used for `residuum_stats`. """ _requirements = remove_items(DataHandlerFirFilterSingleStation.requirements(), "station") @@ -312,15 +312,15 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation @TimeTrackingWrapper def apply_filter(self): """Apply FIR filter only on inputs.""" - apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori + self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori climate_filter = ClimateFIRFilter(self.input_data, self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, - apriori_type=self.apriori_type, apriori=apriori, + apriori_type=self.apriori_type, apriori=self.apriori, sel_opts=self.apriori_sel_opts) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori - if self.apriori_type == "residuum_stat": + if self.apriori_type == "residuum_stats": self.apriori = climate_filter.apriori_data else: self.apriori = climate_filter.initial_apriori_data diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 4c386885f66133e009ea211b9586cb07c38f28b4..4fce4f50f106ea4ec11f74724ba6089f80955c3e 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -1,7 +1,9 @@ import gc import warnings from typing import Union +import logging +import datetime import numpy as np import pandas as pd from matplotlib import pyplot as plt @@ -76,47 +78,137 @@ class ClimateFIRFilter: apriori_list = to_list(apriori) input_data = data.__deepcopy__() for i in range(len(order)): + # calculate climatological filter fi, hi, apriori = self.clim_filter(input_data, fs, cutoff[i], order[i], apriori=apriori_list[i], sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, window=window, var_dim=var_dim) filtered.append(fi) h.append(hi) - input_data = input_data - fi # calculate residuum + + # calculate residuum + input_data = input_data - fi + + # create new apriori information for next iteration if no further apriori is provided if len(apriori_list) <= i + 1: - if apriori_type is None or apriori_type == "zeros": - apriori_list.append(xr.zeros_like(apriori_list[i])) # zero version - elif apriori_type == "residuum_stats": + if apriori_type is None or apriori_type == "zeros": # zero version + apriori_list.append(xr.zeros_like(apriori_list[i])) + elif apriori_type == "residuum_stats": # calculate monthly statistic on residuum apriori_list.append(-self.create_monthly_mean(input_data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim)) else: raise ValueError(f"Cannot handle unkown apriori type: {apriori_type}. Please choose from None, " f"`zeros` or `residuum_stats`.") - # add residuum to filtered + # add last residuum to filtered filtered.append(input_data) self._filtered = filtered self._h = h self._apriori = apriori_list @staticmethod - def create_monthly_mean(data, sel_opts=None, sampling="1d", time_dim="datetime"): - monthly = xr.ones_like(data) + def create_unity_array(data, time_dim, extend_range=365): + """Create a xr data array filled with ones. time_dim is extended by extend_range days in future and past.""" + coords = data.coords + + # extend time_dim by given extend_range days + start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D") + end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D") + new_time_axis = np.arange(start, end).astype("datetime64[ns]") + + # construct data array with updated coords + new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords} + new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims) + + # loffset is required because resampling uses last day in month as resampling timestamp + return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max() + + def create_monthly_mean(self, data, sel_opts=None, sampling="1d", time_dim="datetime"): + """Calculate monthly statistics.""" + + # create unity xarray in monthly resolution with sampling point in mid of each month + monthly = self.create_unity_array(data, time_dim) + + # apply selection if given (only use subset for monthly means) if sel_opts is not None: data = data.sel(**sel_opts) + + # create monthly mean and replace entries in unity array monthly_mean = data.groupby(f"{time_dim}.month").mean() for month in monthly_mean.month.values: loc = (monthly[f"{time_dim}.month"] == month) monthly.loc[{time_dim: loc}] = monthly_mean.sel(month=month) - return monthly.resample({time_dim: "1m"}).mean().resample({time_dim: sampling}).interpolate() + + # aggregate monthly information (shift by half month, because resample base is last day) + return monthly.resample({time_dim: "1m"}).max().resample({time_dim: sampling}).interpolate() + + @staticmethod + def extend_apriori(data, apriori, time_dim): + """ + Extend time range of apriori information. + + This method will fail, if apriori is available for a shorter period than the gab to fill. + """ + dates = data.coords[time_dim].values + + # apriori starts after data + if dates[0] < apriori.coords[time_dim].values[0]: + # add difference in full years + date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]") + extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + coords = apriori.coords + + # create new time axis + start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D") + end = coords[time_dim][0].values.astype("datetime64[D]") + new_time_axis = np.arange(start, end).astype("datetime64[ns]") + + # extract old values to use with new axis + start = coords[time_dim][0].values.astype("datetime64[D]") + end = coords[time_dim][0].values.astype("datetime64[D]") + np.timedelta64(extend_range - 1, "D") + new_values = apriori.sel({time_dim: slice(start, end)}) + new_values.coords[time_dim] = new_time_axis + + # add new values to apriori + apriori = apriori.combine_first(new_values) + + # apriori ends before data + if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]: + # add difference in full years + 1 year (because apriori is used as future estimate) + date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]") + extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365 + coords = apriori.coords + + # create new time axis + start = coords[time_dim][-1].values.astype("datetime64[D]") + end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D") + new_time_axis = np.arange(start, end).astype("datetime64[ns]") + + # extract old values to use with new axis + start = coords[time_dim][-1].values.astype("datetime64[D]") - np.timedelta64(extend_range - 1, "D") + end = coords[time_dim][-1].values.astype("datetime64[D]") + new_values = apriori.sel({time_dim: slice(start, end)}) + new_values.coords[time_dim] = new_time_axis + + # add new values to apriori + apriori = apriori.combine_first(new_values) + + return apriori def clim_filter(self, data, fs, cutoff_high, order, apriori=None, padlen=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window="hamming"): + + # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) + apriori = self.extend_apriori(data, apriori, time_dim) + + # calculate FIR filter coefficients h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) length = len(h) + + # start loop on all timestamps dt = data.coords[time_dim].values res = xr.zeros_like(data) - print("start iteration") + logging.info("start iteration") for i in range(0, len(dt)): t0 = dt[i] pd_date = pd.to_datetime(t0) @@ -135,7 +227,7 @@ class ClimateFIRFilter: padlen=_padlen, dim=var_dim, window=window, h=h) res.loc[{time_dim: t0}] = tmp_filter.loc[{time_dim: t0}] except IndexError: - pass + res.loc[{time_dim: t0}] = np.nan # if i == 720: # for var in data.coords[var_dim]: # data.sel({var_dim: var, time_dim: slice(dt[i_m], dt[i_p+1])}).plot()