diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index f83327dbfccd13278c81b03c59a94098563de0ed..cb381a2a4875ae6cc5bb3d4951624cb562aaf72f 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -393,7 +393,7 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi if parameter_name in kwargs: window_opt = kwargs.pop(parameter_name) if isinstance(window_opt, dict): - window_opt = window_opt[key] + window_opt = window_opt.get(key) kwargs[parameter_name] = window_opt def _create_collection(self): @@ -417,6 +417,7 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi kwargs_chem["variables"] = chem_vars cls.adjust_window_opts("chem", "window_history_size", kwargs_chem) cls.adjust_window_opts("chem", "window_history_offset", kwargs_chem) + cls.adjust_window_opts("chem", "extend_length_opts", kwargs_chem) dh_transformation = (cls.data_handler_climate_fir, cls.data_handler_unfiltered) transformation_chem = super().transformation(set_stations, tmp_path=tmp_path, dh_transformation=dh_transformation, **kwargs_chem) @@ -426,6 +427,7 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi kwargs_meteo["variables"] = meteo_vars cls.adjust_window_opts("meteo", "window_history_size", kwargs_meteo) cls.adjust_window_opts("meteo", "window_history_offset", kwargs_meteo) + cls.adjust_window_opts("meteo", "extend_length_opts", kwargs_meteo) dh_transformation = (cls.data_handler_fir, cls.data_handler_unfiltered) transformation_meteo = super().transformation(set_stations, tmp_path=tmp_path, dh_transformation=dh_transformation, **kwargs_meteo) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 9d274558e3805334cd706fe4e2c9a914ad6e5f58..ec20c9d02221f3096b6f372cb0b81e96d41b6ff2 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -32,6 +32,15 @@ data_or_none = Union[xr.DataArray, None] class DataHandlerSingleStation(AbstractDataHandler): + """ + :param window_history_offset: used to include future values into history / inputs. Example: set + window_history_size=48 and window_history_offset=12 to use 2 days as history / input but not on interval t0 + + [-48h, 0h] but t0 + [-36h, 12h]. This parameter is especially used together with a 1d target variable (mixed + sampling) that measurement interval is not from 0000 to 0000 but 1700 (day before) to 1700 (day of t0) or + similar (e.g. dma8eu ozone). Then it is possible to use window_history_size=48+16 and window_history_offset=16 + to use 3 days of input without all timesteps starting at 1700 the day before (as they are included in the + target). + """ DEFAULT_STATION_TYPE = "background" DEFAULT_NETWORK = "AIRBASE" DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 07fdc41fc4dae49bd44a071dd2228c4bff860b04..95d9f66d9c36da4c5f75fa079e2b3d2675108c64 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -326,6 +326,12 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation apriori_type is None or `zeros`, and a climatology of the residuum is used for `residuum_stats`. :param apriori_diurnal: use diurnal anomalies of each hour as addition to the apriori information type chosen by parameter apriori_type. This is only applicable for hourly resolution data. + :param extend_length_opts: use this parameter to use future data in the filter calculation. This parameter does not + affect the size of the history samples as this is handled by the window_history_offset parameter. Example: set + extend_length_opts=7*24 to use the observation of the next 7 days to calculate the filtered components. Which + data are finally used for the input samples is not affected by these 7 days. In case the window_history_offset + parameter exceeds the `extend_length_opts`, it has also an influence on the filter component calculation as it + will return also climatological estimates. """ _hash = DataHandlerFirFilterSingleStation._hash + ["apriori_type", "apriori_sel_opts", "apriori_diurnal", "extend_length_opts"] @@ -340,7 +346,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.all_apriori = None # collection of all apriori information self.apriori_sel_opts = apriori_sel_opts # ensure to separate exogenous and endogenous information self.plot_name_affix = name_affix - self.extend_length_opts = extend_length_opts if extend_length_opts is not None else {} + self.extend_length_opts = extend_length_opts super().__init__(*args, **kwargs) @TimeTrackingWrapper @@ -355,7 +361,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, plot_path=self.plot_path, minimum_length=self.window_history_size, new_dim=self.window_dim, - station_name=self.station[0], extend_length_opts=self.extend_length_opts) + station_name=self.station[0], extend_length_opts=self.extend_length_opts, + offset=self.window_history_offset) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori @@ -365,17 +372,18 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.apriori = climate_filter.initial_apriori_data self.all_apriori = climate_filter.apriori_data - if isinstance(self.extend_length_opts, int): - climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size, self.extend_length_opts)}) + if isinstance(self.window_history_offset, int): + climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size + self.window_history_offset, + self.window_history_offset)}) for c in climate_filter.filtered_data] else: climate_filter_data = [] for c in climate_filter.filtered_data: coll_tmp = [] for v in c.coords[self.target_dim].values: - upper_lim = self.extend_length_opts.get(v, 0) + delta_lim = self.window_history_offset.get(v, 0) coll_tmp.append(c.sel({self.target_dim: v, - self.window_dim: slice(-self.window_history_size, upper_lim)})) + self.window_dim: slice(-self.window_history_size + delta_lim, delta_lim)})) climate_filter_data.append(xr.concat(coll_tmp, self.target_dim)) # create input data with filter index @@ -444,12 +452,14 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation expression :param dim_name_of_shift: Dimension along shift will be applied """ - data = self.input_data - sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) - data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, - sampling) - data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset - self.history = data + self.history = self.input_data.__deepcopy__() + # #todo stopped here. maybe there is nothing to do anymore? + # data = self.input_data #todo trim data to be only in range slice(None, self.window_history_offset) ?? + # sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) + # data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, + # sampling) + # data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset + # self.history = data def call_transform(self, inverse=False): opts_input = self._transformation[0] diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index b25d6ee10f89bfa49c2147d1758a2d24b8e7687e..0cf0efac691ef508b05b7c8c2a65cbe4c785fae1 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -168,7 +168,8 @@ class ClimateFIRFilter(FIRFilter): def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, - minimum_length=None, new_dim=None, station_name=None, extend_length_opts: Union[dict, int] = 0): + minimum_length=None, new_dim=None, station_name=None, extend_length_opts: Union[dict, int] = None, + offset: Union[dict, int] = 0): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -186,7 +187,12 @@ class ClimateFIRFilter(FIRFilter): resoluted data). The mean anomaly of each hour is added to the apriori_type information. :param extend_length_opts: shift information switch between historical data and apriori estimation by the given values (default None). Must either be a dictionary with keys available in var_dim or a single value that is - applied to all data. + applied to all data. This parameter has only influence on which information is available at t0 for the + filter calculcation but has no influence on the shape of the returned filtered data. + :param offset: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. In + case the offset parameter is larger than the extend_lenght_opts parameter, this leads to the case that not + only observational data but also climatological estimations are returned. Must either be a dictionary with + keys available in var_dim or a single value that is applied to all data. Default is 0. """ self._apriori = apriori self.apriori_type = apriori_type @@ -197,7 +203,7 @@ class ClimateFIRFilter(FIRFilter): self.plot_data = [] self.extend_length_opts = extend_length_opts super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, station_name=station_name, - minimum_length=minimum_length, plot_path=plot_path) + minimum_length=minimum_length, plot_path=plot_path, offset=offset) def run(self): filtered = [] @@ -235,13 +241,18 @@ class ClimateFIRFilter(FIRFilter): window=self.window, var_dim=self.var_dim, minimum_length=_minimum_length, new_dim=new_dim, plot_dates=plot_dates, station_name=self.station_name, - extend_length_opts=self.extend_length_opts) + extend_length_opts=self.extend_length_opts, + offset=self.offset) logging.info(f"{self.station_name}: finished clim_filter calculation") - if self.minimum_length is None: - filtered.append(fi) + # if self.minimum_length is None: #todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) + # filtered.append(fi) + # else: + # filtered.append(fi.sel({new_dim: slice(-self.minimum_length, None)})) + if self.minimum_length is None: # todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) + filtered.append(fi.sel({new_dim: slice(None, self.offset)})) else: - filtered.append(fi.sel({new_dim: slice(-self.minimum_length, None)})) + filtered.append(fi.sel({new_dim: slice(-self.minimum_length, self.offset)})) h.append(hi) gc.collect() self.plot_data.append(plot_data) @@ -275,10 +286,14 @@ class ClimateFIRFilter(FIRFilter): raise ValueError(f"Cannot handle unkown apriori type: {self.apriori_type}. Please choose from None," f" `zeros` or `residuum_stats`.") # add last residuum to filtered - if self.minimum_length is None: - filtered.append(input_data) + # if self.minimum_length is None: #todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) + # filtered.append(input_data) + # else: + # filtered.append(input_data.sel({new_dim: slice(-self.minimum_length, None)})) + if self.minimum_length is None: # todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) + filtered.append(input_data.sel({new_dim: slice(None, self.offset)})) else: - filtered.append(input_data.sel({new_dim: slice(-self.minimum_length, None)})) + filtered.append(input_data.sel({new_dim: slice(-self.minimum_length, self.offset)})) self._filtered = filtered self._h = h @@ -625,7 +640,7 @@ class ClimateFIRFilter(FIRFilter): @staticmethod def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, - minimum_length: int = None, extend_length_opts: int = 0) -> xr.DataArray: + minimum_length: int = None, extend_length_future: int = 0) -> xr.DataArray: """ Trim data along given axis between either -minimum_length (if given) or -extend_length_history and extend_length_opts (which is default set to 0). @@ -635,13 +650,13 @@ class ClimateFIRFilter(FIRFilter): minimum_length is not provided :param dim: dim to apply trim on :param minimum_length: start number for trim range (transformed to negative), preferably used (default None) - :param extend_length_opts: number to use in "future" + :param extend_length_future: number to use in "future" :returns: trimmed data """ if minimum_length is None: - return data.sel({dim: slice(-extend_length_history, extend_length_opts)}, drop=True) + return data.sel({dim: slice(-extend_length_history, extend_length_future)}, drop=True) else: - return data.sel({dim: slice(-minimum_length, extend_length_opts)}, drop=True) + return data.sel({dim: slice(-minimum_length, extend_length_future)}, drop=True) @staticmethod def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, @@ -666,10 +681,13 @@ class ClimateFIRFilter(FIRFilter): def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", minimum_length=None, new_dim="window", plot_dates=None, station_name=None, - extend_length_opts: Union[dict, int] = None): + extend_length_opts: Union[dict, int] = None, offset: Union[dict, int] = None): logging.debug(f"{station_name}: extend apriori") - extend_opts = extend_length_opts if extend_length_opts is not None else {} + # set data horizon + offset = offset if offset is not None else {} + # if extent_length_opts is not given, use same as offset + extend_opts = extend_length_opts if extend_length_opts is not None else offset # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: @@ -698,6 +716,7 @@ class ClimateFIRFilter(FIRFilter): logging.info(f"{station_name} ({var}): sel data") _start, _end = self._get_year_interval(data, time_dim) + offset_var = offset.get(var, 0) if isinstance(offset, dict) else offset extend_opts_var = extend_opts.get(var, 0) if isinstance(extend_opts, dict) else extend_opts filt_coll = [] for _year in range(_start, _end + 1): @@ -705,7 +724,8 @@ class ClimateFIRFilter(FIRFilter): # select observations and apriori data time_slice = self._create_time_range_extend( - _year, sampling, max(extend_length_history, extend_length_future + extend_opts_var)) + _year, sampling, max(extend_length_history, extend_length_future + max(extend_opts_var, + offset_var))) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) if len(d.coords[time_dim]) == 0: # no data at all for this year @@ -713,8 +733,7 @@ class ClimateFIRFilter(FIRFilter): # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, - extend_length_future, - extend_length_separator=extend_opts_var) + extend_length_future + offset_var, extend_length_separator=extend_opts_var) # select only data for current year try: @@ -733,13 +752,13 @@ class ClimateFIRFilter(FIRFilter): # trim data if required trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, minimum_length, - extend_length_opts=extend_opts_var) + extend_length_future=max(extend_opts_var, offset_var)) filt_coll.append(trimmed) # visualization plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, - minimum_length, h, var, extend_opts_var)) + minimum_length, h, var, extend_opts_var)) #todo not updated with offset_var # collect all filter results coll.append(xr.concat(filt_coll, time_dim))