From 4cfa8a110d2532eac035cbfde385b39ffe131b5d Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 13 Jun 2023 21:09:00 +0200 Subject: [PATCH] filter can now combine obs, forecast, and apriori for first iteration. Further adjustments are required --- .../data_handler_mixed_sampling.py | 37 +++++ .../data_handler_single_station.py | 2 +- .../data_handler/data_handler_with_filter.py | 5 +- mlair/helpers/data_sources/data_loader.py | 14 +- mlair/helpers/data_sources/era5.py | 6 +- mlair/helpers/data_sources/ifs.py | 7 +- mlair/helpers/data_sources/toar_data.py | 7 +- mlair/helpers/filter.py | 132 ++++++++++++++---- 8 files changed, 162 insertions(+), 48 deletions(-) diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index eaa6a211..4a0bbf68 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -466,3 +466,40 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi else: # if no unfiltered meteo branch transformation_res["filtered_meteo"] = transformation_meteo return transformation_res if len(transformation_res) > 0 else None + + +class DataHandlerIFSSingleStation(DataHandlerMixedSamplingWithClimateFirFilterSingleStation): + + def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: + start, end = self.update_start_end(ind) + vars = [self.variables, self.target_var] + stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) + + data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], + self.store_data_locally, self.data_origin, start, end) + if ind == 1: # only for target + data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], + limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) + return data + + def make_input_target(self): + """ + A FIR filter is applied on the input data that has hourly resolution. Labels Y are provided as aggregated values + with daily resolution. + """ + self._data = tuple(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data + self.set_inputs_and_targets() + self.apply_filter() + + +class DataHandlerIFS(DataHandlerMixedSamplingWithClimateAndFirFilter): + + data_handler_fir = (DataHandlerMixedSamplingWithFirFilterSingleStation, + DataHandlerIFSSingleStation) + + @classmethod + def set_data_handler_fir_pos(cls, **kwargs): + """ + Set position of fir data handler to always use climate FIR. + """ + cls.data_handler_fir_pos = 1 # use slower fir version with climate estimate diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 76f73bf4..552123f3 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -393,7 +393,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ used_chem_vars = list(set(self.chem_vars) & set(data.coords[self.target_dim].values)) if len(used_chem_vars) > 0: - data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) + data = data.sel({self.target_dim: used_chem_vars}).clip(min=minimum).combine_first(data) return data def setup_data_path(self, data_path: str, sampling: str): diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index e5760e9a..4ec25a83 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -383,7 +383,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation plot_path=self.plot_path, minimum_length=self.window_history_size, new_dim=self.window_dim, display_name=self.station[0], extend_length_opts=self.extend_length_opts, - offset=self.window_history_end, plot_dates=self.plot_dates) + extend_end=self.window_history_end, plot_dates=self.plot_dates, + offset=self.window_history_offset) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori @@ -459,7 +460,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation :param window: this parameter is not used in the inherited method :param dim_name_of_shift: Dimension along shift will be applied """ - data = self.input_data + data = self.input_data # TODO has to be refactored as window_history_offset must be included in filter calc sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, sampling) diff --git a/mlair/helpers/data_sources/data_loader.py b/mlair/helpers/data_sources/data_loader.py index 4e69c006..094ce20d 100644 --- a/mlair/helpers/data_sources/data_loader.py +++ b/mlair/helpers/data_sources/data_loader.py @@ -54,16 +54,18 @@ def download_data(file_name: str, meta_file: str, station, statistics_per_var, s # load era5 data df_era5_local, meta_era5_local = data_sources.era5.load_era5( station_name=station, stat_var=era5_local_stats, sampling=sampling, data_origin=era5_local_origin, - window_dim=window_dim, time_dim=time_dim) + window_dim=window_dim, time_dim=time_dim, target_dim=target_dim) if ifs_local_origin is not None and len(ifs_local_stats) > 0: # load era5 data - df_ifs_local, meta_ifs5_local = data_sources.ifs.load_ifs( + df_ifs_local, meta_ifs_local = data_sources.ifs.load_ifs( station_name=station, stat_var=ifs_local_stats, sampling=sampling, data_origin=ifs_local_origin, - lead_time_dim=window_dim, initial_time_dim=time_dim) - if toar_origin is None or len(toar_stats) > 0: # TODO return toar data as xarray + lead_time_dim=window_dim, initial_time_dim=time_dim, target_dim=target_dim) + if toar_origin is None or len(toar_stats) > 0: # load combined data from toar-data (v2 & v1) df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, - sampling=sampling, data_origin=toar_origin) + sampling=sampling, data_origin=toar_origin, + window_dim=window_dim, time_dim=time_dim, + target_dim=target_dim) valid_df = [e for e in [df_era5_local, df_toar, df_ifs_local] if e is not None] if len(valid_df) == 0: @@ -121,7 +123,7 @@ def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) -> else: logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}") except Exception as e: - time.sleep(retry) + time.sleep(2 * (2 ** retry)) logging.debug(f"There was an error for request {url}: {e}") if retry + 1 >= max_retries: raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index 66117106..156df00d 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -17,7 +17,7 @@ from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint -def load_era5(station_name, stat_var, sampling, data_origin, time_dim, window_dim): +def load_era5(station_name, stat_var, sampling, data_origin, time_dim, window_dim, target_dim): # make sure station_name parameter is a list station_name = helpers.to_list(station_name) @@ -62,13 +62,13 @@ def load_era5(station_name, stat_var, sampling, data_origin, time_dim, window_di # convert to local timezone station_data.coords["time"] = correct_timezone(station_data.to_pandas(), station_meta, sampling).index - station_data = station_data.rename({"time": time_dim}) + station_data = station_data.rename({"time": time_dim, "variable": target_dim}) # expand window_dim station_data = station_data.expand_dims({window_dim: [0]}) # create meta data - variable_meta = _emulate_meta_data(station_data.coords["variable"].values) + variable_meta = _emulate_meta_data(station_data.coords[target_dim].values) meta = combine_meta_data(station_meta, variable_meta) meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name diff --git a/mlair/helpers/data_sources/ifs.py b/mlair/helpers/data_sources/ifs.py index 27faab3e..eba40dcc 100644 --- a/mlair/helpers/data_sources/ifs.py +++ b/mlair/helpers/data_sources/ifs.py @@ -20,7 +20,7 @@ from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint -def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initial_time_dim): +def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initial_time_dim, target_dim): # make sure station_name parameter is a list station_name = helpers.to_list(station_name) @@ -69,9 +69,10 @@ def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initi sampling).index # rename lead time and initial time to MLAir's internal dimension names - station_data = station_data.rename({"lead_time": lead_time_dim, "initial_time": initial_time_dim}) + station_data = station_data.rename({"lead_time": lead_time_dim, "initial_time": initial_time_dim, + "variable": target_dim}) - variable_meta = _emulate_meta_data(station_data.coords["variable"].values) + variable_meta = _emulate_meta_data(station_data.coords[target_dim].values) meta = combine_meta_data(station_meta, variable_meta) meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index 1181318c..f5c12df4 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -6,9 +6,10 @@ from . import join, toar_data_v2 import requests import pandas as pd from .data_loader import EmptyQueryResult +import xarray as xr -def download_toar(station, toar_stats, sampling, data_origin): +def download_toar(station, toar_stats, sampling, data_origin, window_dim, time_dim, target_dim): try: # load data from toar-data (v2) df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) @@ -29,7 +30,9 @@ def download_toar(station, toar_stats, sampling, data_origin): else: df_merged = df_toar if df_toar is not None else df_join meta_merged = meta_toar if df_toar is not None else meta_join - return df_merged, meta_merged + xarr_merged = xr.DataArray(df_merged, dims=[time_dim, target_dim]) + xarr_merged = xarr_merged.expand_dims({window_dim: [0]}) + return xarr_merged, meta_merged def merge_toar_join(df_toar, df_join, sampling): diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index f834895d..1ba0a914 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -7,7 +7,7 @@ import datetime import numpy as np import pandas as pd from matplotlib import pyplot as plt -from scipy import signal +from scipy import signal, stats import xarray as xr import dask.array as da @@ -18,7 +18,7 @@ class FIRFilter: from mlair.plotting.data_insight_plotting import PlotFirFilter def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None, - offset=0, plot_path=None, plot_dates=None): + extend_end=0, plot_path=None, plot_dates=None, offset=0): self._filtered = [] self._h = [] self.data = data @@ -30,6 +30,7 @@ class FIRFilter: self.time_dim = time_dim self.display_name = display_name self.minimum_length = minimum_length + self.extend_end = extend_end self.offset = offset self.plot_path = plot_path self.plot_dates = plot_dates @@ -57,7 +58,7 @@ class FIRFilter: # visualization plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi, - self.minimum_length, self.order, i, self.offset, self.var_dim)) + self.minimum_length, self.order, i, self.extend_end, self.var_dim)) # calculate residuum input_data = input_data - fi @@ -75,7 +76,7 @@ class FIRFilter: logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling, - h, minimum_length, order, i, offset, var_dim): # pragma: no cover + h, minimum_length, order, i, extend_end, var_dim): # pragma: no cover plot_data = [] minimum_length = minimum_length or 0 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): @@ -88,7 +89,7 @@ class FIRFilter: extend_length_history = minimum_length + int((length + 1) / 2) extend_length_future = int((length + 1) / 2) + 1 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) - t_plus = viz_date + np.timedelta64(int(extend_length_future + offset), td_type) + t_plus = viz_date + np.timedelta64(int(extend_length_future + extend_end), td_type) time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type)) plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}), "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim, @@ -174,7 +175,7 @@ class ClimateFIRFilter(FIRFilter): def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0, - offset: Union[dict, int] = 0, plot_dates=None): + extend_end: Union[dict, int] = 0, plot_dates=None, offset: int = 0): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -196,11 +197,12 @@ class ClimateFIRFilter(FIRFilter): :param extend_length_opts: shift information switch between historical data and apriori estimation by the given values (default None). Must either be a dictionary with keys available in var_dim or a single value that is applied to all data. This parameter has only influence on which information is available at t0 for the - filter calculcation but has no influence on the shape of the returned filtered data. - :param offset: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. In - case the offset parameter is larger than the extend_lenght_opts parameter, this leads to the case that not - only observational data but also climatological estimations are returned. Must either be a dictionary with - keys available in var_dim or a single value that is applied to all data. Default is 0. + filter calculation but has no influence on the shape of the returned filtered data. + :param extend_end: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. + In case the extend_end parameter is larger than the extend_length_opts parameter, this leads to the case + that not only observational data but also climatological estimations are returned. Must either be a + dictionary with keys available in var_dim or a single value that is applied to all data. Default is 0. + :param offset: Shift t0 by this timestep delta. Default is 0. """ self._apriori = apriori self.apriori_type = apriori_type @@ -211,15 +213,22 @@ class ClimateFIRFilter(FIRFilter): self.plot_data = [] self.extend_length_opts = extend_length_opts super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name, - minimum_length=minimum_length, plot_path=plot_path, offset=offset, plot_dates=plot_dates) + minimum_length=minimum_length, plot_path=plot_path, extend_end=extend_end, plot_dates=plot_dates, offset=offset) def run(self): filtered = [] h = [] + forecasts = None if self.sel_opts is not None: self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} self._check_sel_opts() sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) + + if self.new_dim in self.data.coords: + pseudo_timeseries = self.create_pseudo_timeseries(self.data, self.time_dim, sampling, self.new_dim) + forecasts = self.data.__deepcopy__() + self.data = pseudo_timeseries + logging.debug(f"{self.display_name}: create diurnal_anomalies") if self.apriori_diurnal is True and sampling == "1H": diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, @@ -251,13 +260,14 @@ class ClimateFIRFilter(FIRFilter): minimum_length=self.minimum_length, new_dim=new_dim, plot_dates=plot_dates, display_name=self.display_name, extend_opts=self.extend_length_opts, - offset=self.offset, next_order=next_order) + extend_end=self.extend_end, next_order=next_order, + forecasts=forecasts, offset=self.offset) logging.info(f"{self.display_name}: finished clim_filter calculation") if self.minimum_length is None: - filtered.append(fi.sel({new_dim: slice(None, self.offset)})) + filtered.append(fi.sel({new_dim: slice(None, self.extend_end)})) else: - filtered.append(fi.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) + filtered.append(fi.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) h.append(hi) gc.collect() self.plot_data.append(plot_data) @@ -293,9 +303,9 @@ class ClimateFIRFilter(FIRFilter): # add last residuum to filtered if self.minimum_length is None: - filtered.append(input_data.sel({new_dim: slice(None, self.offset)})) + filtered.append(input_data.sel({new_dim: slice(None, self.extend_end)})) else: - filtered.append(input_data.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) + filtered.append(input_data.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) self._filtered = filtered self._h = h @@ -534,9 +544,16 @@ class ClimateFIRFilter(FIRFilter): return apriori + @staticmethod + def get_forecast_run_delta(data, time_dim): + time_data = data.coords[time_dim].values + deltas = (time_data[1:] - time_data[:-1]) / np.timedelta64(1, "h") + return stats.mode(deltas).mode[0] + def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str, extend_length_history: int, extend_length_future: int, - extend_length_separator: int = 0) -> xr.DataArray: + extend_length_separator: int = 0, forecasts: xr.DataArray = None, + sampling: str = "1H", extend_end: int = 0, offset: int = 0) -> xr.DataArray: """ Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If @@ -557,13 +574,40 @@ class ClimateFIRFilter(FIRFilter): :returns: combined data array """ # prepare historical data / observation - ext_sep = min(extend_length_separator, extend_length_future) + if forecasts is None: + ext_sep = offset + min(extend_end, extend_length_future) + else: + ext_sep = min(offset, extend_length_future) if new_dim not in data.coords: history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1), time_dim, new_dim) else: history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)}) + if forecasts is not None: + forecast_delta = self.get_forecast_run_delta(forecasts, time_dim) + for lead_time in forecasts.coords[new_dim]: + delta = np.timedelta64(int(lead_time - offset), {"1d": "D", "1H": "h"}.get(sampling)) + forecasts_tmp = forecasts.sel({new_dim: slice(None, extend_end)}) + forecasts_tmp.coords[time_dim] = forecasts_tmp.coords[time_dim] + delta + forecasts_tmp.coords[new_dim] = forecasts_tmp.coords[new_dim] + offset - lead_time + history = history.combine_first(forecasts_tmp) + print(lead_time) + # history.plot() + if lead_time >= forecast_delta - 1: # stop when all gaps are filled + break + + # time_point = history.coords["datetime"][11] + # self._shift_data(data, range(int(-extend_length_history), ext_sep + 21), time_dim, new_dim).sel( + # datetime=time_point).plot() + # history.sel(datetime=time_point).plot() + # history.sel(datetime=history.coords["datetime"][0]) + # history.sel(datetime=history.coords["datetime"][1]) + # history.sel(datetime=history.coords["datetime"][11]).plot() + # history.sel(datetime=history.coords["datetime"][12]).plot() + # history.sel(datetime=history.coords["datetime"][46]).plot() + # history.sel(datetime=history.coords["datetime"][47]).plot() + if extend_length_future > ext_sep + 1: # prepare climatological statistics if new_dim not in apriori.coords: @@ -571,19 +615,43 @@ class ClimateFIRFilter(FIRFilter): extend_length_future + 1), time_dim, new_dim) else: - future = apriori.sel({new_dim: slice(ext_sep + 1, - extend_length_future)}) + future = apriori.sel({new_dim: slice(ext_sep + 1, extend_length_future)}) # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + filter_input_data = history.combine_first(future) + # filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") return filter_input_data else: return history + @staticmethod + def create_full_time_dim(data, dim, freq): + """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped.""" + start = data.coords[dim].values[0] + end = data.coords[dim].values[-1] + datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq)) + t = data.sel({dim: start}, drop=True) + res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords]) + res = res.transpose(*data.dims) + res.loc[data.coords] = data + return res + + def create_pseudo_timeseries(self, data, time_dim, sampling, window_dim): + empty_data = data.sel({window_dim: 0}, drop=True) * np.nan + data_full_time_index = self.create_full_time_dim(empty_data, time_dim, sampling) + for lead_time in data.coords[window_dim]: + data_tmp = data.sel({window_dim: lead_time}, drop=True) + delta = np.timedelta64(int(lead_time), {"1d": "D", "1H": "h"}.get(sampling)) + data_tmp.coords[time_dim] = data_tmp.coords[time_dim] + delta + data_full_time_index = data_full_time_index.combine_first(data_tmp) + if data_full_time_index.isnull().sum() == 0: # stop when all gaps are filled + break + return data_full_time_index + def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, minimum_length, h, - variable_name, extend_length_opts=None, offset=None): # pragma: no cover + variable_name, extend_length_opts=None, extend_end=None): # pragma: no cover plot_data = [] - offset = 0 if offset is None else offset + extend_end = 0 if extend_end is None else extend_end extend_length_opts = 0 if extend_length_opts is None else extend_length_opts for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values): try: @@ -599,7 +667,7 @@ class ClimateFIRFilter(FIRFilter): else: tmp_filter_data = None valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2) - valid_end = min(extend_length_opts + offset + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) + valid_end = min(extend_length_opts + extend_end + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) valid_range = range(valid_start, valid_end) plot_data.append({"t0": t0, "var": variable_name, @@ -686,7 +754,7 @@ class ClimateFIRFilter(FIRFilter): def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None, - extend_opts: int = 0, offset: int = 0): + extend_opts: int = 0, extend_end: int = 0, forecasts=None, offset: int = 0): logging.debug(f"{display_name}: extend apriori") @@ -701,8 +769,8 @@ class ClimateFIRFilter(FIRFilter): length = len(h) # set data extend that is required for filtering - extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - offset - extend_length_future = offset + int((next_order + 1) / 2) + int((length + 1) / 2) + extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - extend_end + extend_length_future = extend_end + int((next_order + 1) / 2) + int((length + 1) / 2) # collect some data for visualization plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs @@ -728,12 +796,14 @@ class ClimateFIRFilter(FIRFilter): _year, sampling, max(extend_length_history, extend_length_future)) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) + f = forecasts.sel({var_dim: [var]}) if len(d.coords[time_dim]) == 0: # no data at all for this year continue # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, - extend_length_future, extend_length_separator=extend_opts) + extend_length_future, extend_length_separator=extend_opts, forecasts=f, sampling=sampling, + extend_end=extend_end, offset=offset) # select only data for current year try: @@ -752,7 +822,7 @@ class ClimateFIRFilter(FIRFilter): # trim data if required valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1 - ext_len = min(extend_length_future, valid_range_end) + ext_len = min(extend_length_future, valid_range_end) #todo start here and adjust by offset parameter trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, extend_length_future=ext_len) filt_coll.append(trimmed) @@ -763,7 +833,7 @@ class ClimateFIRFilter(FIRFilter): # visualization plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, - minimum_length, h, var, extend_opts, offset)) + minimum_length, h, var, extend_opts, extend_end)) # collect all filter results coll.append(xr.concat(filt_coll, time_dim)) -- GitLab