diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index cb381a2a4875ae6cc5bb3d4951624cb562aaf72f..5466b6dacc1fe6829576ee721e044fd2da188680 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -308,6 +308,7 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi data_handler_climate_fir = DataHandlerMixedSamplingWithClimateFirFilterSingleStation data_handler_fir = (DataHandlerMixedSamplingWithFirFilterSingleStation, DataHandlerMixedSamplingWithClimateFirFilterSingleStation) + data_handler_fir_pos = None data_handler = None data_handler_unfiltered = DataHandlerMixedSamplingSingleStation _requirements = list(set(data_handler_climate_fir.requirements() + data_handler_fir[0].requirements() + @@ -351,50 +352,51 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi if len(chem_vars) > 0: sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_climate_fir.requirements() if k in kwargs} sp_keys = cls.build_update_kwargs(sp_keys, dh_type="filtered_chem") - sp_keys.update({"variables": chem_vars}) - cls.adjust_window_opts("chem", "window_history_size", sp_keys) - cls.adjust_window_opts("chem", "window_history_offset", sp_keys) - cls.adjust_window_opts("chem", "extend_length_opts", sp_keys) + + cls.prepare_build(sp_keys, chem_vars, "chem") sp_chem = cls.data_handler_climate_fir(station, **sp_keys) if filter_add_unfiltered is True: sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} sp_keys = cls.build_update_kwargs(sp_keys, dh_type="unfiltered_chem") - sp_keys.update({"variables": chem_vars}) - cls.adjust_window_opts("chem", "window_history_size", sp_keys) - cls.adjust_window_opts("chem", "window_history_offset", sp_keys) - cls.adjust_window_opts("chem", "extend_length_opts", sp_keys) + cls.prepare_build(sp_keys, chem_vars, "chem") sp_chem_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) if len(meteo_vars) > 0: - if "extend_length_opts" in kwargs: - cls.data_handler_fir = cls.data_handler_fir[1] # use slower fir version with climate estimate - else: - cls.data_handler_fir = cls.data_handler_fir[0] # use faster fir version without climate estimate - sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_fir.requirements() if k in kwargs} + if cls.data_handler_fir_pos is None: + if "extend_length_opts" in kwargs: + cls.data_handler_fir_pos = 1 # use slower fir version with climate estimate + else: + cls.data_handler_fir_pos = 0 # use faster fir version without climate estimate + sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_fir[cls.data_handler_fir_pos].requirements() if k in kwargs} sp_keys = cls.build_update_kwargs(sp_keys, dh_type="filtered_meteo") - sp_keys.update({"variables": meteo_vars}) - cls.adjust_window_opts("meteo", "window_history_size", sp_keys) - cls.adjust_window_opts("meteo", "window_history_offset", sp_keys) - cls.adjust_window_opts("meteo", "extend_length_opts", sp_keys) - sp_meteo = cls.data_handler_fir(station, **sp_keys) + cls.prepare_build(sp_keys, meteo_vars, "meteo") + sp_meteo = cls.data_handler_fir[cls.data_handler_fir_pos](station, **sp_keys) if filter_add_unfiltered is True: sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} sp_keys = cls.build_update_kwargs(sp_keys, dh_type="unfiltered_meteo") - sp_keys.update({"variables": meteo_vars}) - cls.adjust_window_opts("meteo", "window_history_size", sp_keys) - cls.adjust_window_opts("meteo", "window_history_offset", sp_keys) - cls.adjust_window_opts("meteo", "extend_length_opts", sp_keys) + cls.prepare_build(sp_keys, meteo_vars, "meteo") sp_meteo_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs} return cls(sp_chem, sp_meteo, sp_chem_unfiltered, sp_meteo_unfiltered, chem_vars, meteo_vars, **dp_args) + @classmethod + def prepare_build(cls, kwargs, var_list, var_type): + kwargs.update({"variables": var_list}) + cls.adjust_window_opts(var_type, "window_history_size", kwargs) + cls.adjust_window_opts(var_type, "window_history_offset", kwargs) + cls.adjust_window_opts(var_type, "window_history_end", kwargs) + cls.adjust_window_opts(var_type, "extend_length_opts", kwargs) + @staticmethod def adjust_window_opts(key: str, parameter_name: str, kwargs: dict): - if parameter_name in kwargs: - window_opt = kwargs.pop(parameter_name) - if isinstance(window_opt, dict): - window_opt = window_opt.get(key) - kwargs[parameter_name] = window_opt + try: + if parameter_name in kwargs: + window_opt = kwargs.pop(parameter_name) + if isinstance(window_opt, dict): + window_opt = window_opt[key] + kwargs[parameter_name] = window_opt + except KeyError: + pass def _create_collection(self): collection = super()._create_collection() @@ -414,21 +416,15 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi # chem transformation kwargs_chem = copy.deepcopy(kwargs) - kwargs_chem["variables"] = chem_vars - cls.adjust_window_opts("chem", "window_history_size", kwargs_chem) - cls.adjust_window_opts("chem", "window_history_offset", kwargs_chem) - cls.adjust_window_opts("chem", "extend_length_opts", kwargs_chem) + cls.prepare_build(kwargs_chem, chem_vars, "chem") dh_transformation = (cls.data_handler_climate_fir, cls.data_handler_unfiltered) transformation_chem = super().transformation(set_stations, tmp_path=tmp_path, dh_transformation=dh_transformation, **kwargs_chem) # meteo transformation kwargs_meteo = copy.deepcopy(kwargs) - kwargs_meteo["variables"] = meteo_vars - cls.adjust_window_opts("meteo", "window_history_size", kwargs_meteo) - cls.adjust_window_opts("meteo", "window_history_offset", kwargs_meteo) - cls.adjust_window_opts("meteo", "extend_length_opts", kwargs_meteo) - dh_transformation = (cls.data_handler_fir, cls.data_handler_unfiltered) + cls.prepare_build(kwargs_meteo, meteo_vars, "meteo") + dh_transformation = (cls.data_handler_fir[cls.data_handler_fir_pos or 0], cls.data_handler_unfiltered) transformation_meteo = super().transformation(set_stations, tmp_path=tmp_path, dh_transformation=dh_transformation, **kwargs_meteo) @@ -462,76 +458,3 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi return X else: return super().get_X_original() - - -# -# class DataHandlerSeparationOfScalesSingleStation(DataHandlerMixedSamplingWithKzFilterSingleStation): -# """ -# Data handler using mixed sampling for input and target. Inputs are temporal filtered and depending on the -# separation frequency of a filtered time series the time step delta for input data is adjusted (see image below). -# -# .. image:: ../../../../../_source/_plots/separation_of_scales.png -# :width: 400 -# -# """ -# -# _requirements = DataHandlerMixedSamplingWithKzFilterSingleStation.requirements() -# _hash = DataHandlerMixedSamplingWithKzFilterSingleStation._hash + ["time_delta"] -# -# def __init__(self, *args, time_delta=np.sqrt, **kwargs): -# assert isinstance(time_delta, Callable) -# self.time_delta = time_delta -# super().__init__(*args, **kwargs) -# -# def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None: -# """ -# Create a xr.DataArray containing history data. -# -# Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted -# data. This is used to represent history in the data. Results are stored in history attribute. -# -# :param dim_name_of_inputs: Name of dimension which contains the input variables -# :param window: number of time steps to look back in history -# Note: window will be treated as negative value. This should be in agreement with looking back on -# a time line. Nonetheless positive values are allowed but they are converted to its negative -# expression -# :param dim_name_of_shift: Dimension along shift will be applied -# """ -# window = -abs(window) -# data = self.input_data -# self.history = self.stride(data, dim_name_of_shift, window, offset=self.window_history_offset) -# -# def stride(self, data: xr.DataArray, dim: str, window: int, offset: int = 0) -> xr.DataArray: -# time_deltas = np.round(self.time_delta(self.cutoff_period)).astype(int) -# start, end = window, 1 -# res = [] -# _range = list(map(lambda x: x + offset, range(start, end))) -# window_array = self.create_index_array(self.window_dim, _range, squeeze_dim=self.target_dim) -# for delta, filter_name in zip(np.append(time_deltas, 1), data.coords["filter"]): -# res_filter = [] -# data_filter = data.sel({"filter": filter_name}) -# for w in _range: -# res_filter.append(data_filter.shift({dim: -(w - offset) * delta - offset})) -# res_filter = xr.concat(res_filter, dim=window_array).chunk() -# res.append(res_filter) -# res = xr.concat(res, dim="filter").compute() -# return res -# -# def estimate_filter_width(self): -# """ -# Attention: this method returns the maximum value of -# * either estimated filter width f = 0.5 / (len * sqrt(itr)) -> T = 1 / f or -# * time delta method applied on the estimated filter width mupliplied by window_history_size -# to provide a sufficiently wide filter width. -# """ -# est = self.kz_filter_length[0] * np.sqrt(self.kz_filter_iter[0]) * 2 -# return int(max([self.time_delta(est) * self.window_history_size, est])) -# -# -# class DataHandlerSeparationOfScales(DefaultDataHandler): -# """Data handler using mixed sampling for input and target. Inputs are temporal filtered and different time step -# sizes are applied in relation to frequencies.""" -# -# data_handler = DataHandlerSeparationOfScalesSingleStation -# data_handler_transformation = DataHandlerSeparationOfScalesSingleStation -# _requirements = data_handler.requirements() diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index ec20c9d02221f3096b6f372cb0b81e96d41b6ff2..13092a94820408b7e138afe58fd29d5d5465fc80 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -33,13 +33,10 @@ data_or_none = Union[xr.DataArray, None] class DataHandlerSingleStation(AbstractDataHandler): """ - :param window_history_offset: used to include future values into history / inputs. Example: set - window_history_size=48 and window_history_offset=12 to use 2 days as history / input but not on interval t0 + - [-48h, 0h] but t0 + [-36h, 12h]. This parameter is especially used together with a 1d target variable (mixed - sampling) that measurement interval is not from 0000 to 0000 but 1700 (day before) to 1700 (day of t0) or - similar (e.g. dma8eu ozone). Then it is possible to use window_history_size=48+16 and window_history_offset=16 - to use 3 days of input without all timesteps starting at 1700 the day before (as they are included in the - target). + :param window_history_offset: used to shift t0 according to the specified value. + :param window_history_end: used to set the last time step that is used to create a sample. A negative value + indicates that not all values up to t0 are used, a positive values indicates usage of values at t>t0. Default + is 0. """ DEFAULT_STATION_TYPE = "background" DEFAULT_NETWORK = "AIRBASE" @@ -49,6 +46,7 @@ class DataHandlerSingleStation(AbstractDataHandler): DEFAULT_WINDOW_LEAD_TIME = 3 DEFAULT_WINDOW_HISTORY_SIZE = 13 DEFAULT_WINDOW_HISTORY_OFFSET = 0 + DEFAULT_WINDOW_HISTORY_END = 0 DEFAULT_TIME_DIM = "datetime" DEFAULT_TARGET_VAR = "o3" DEFAULT_TARGET_DIM = "variables" @@ -62,14 +60,14 @@ class DataHandlerSingleStation(AbstractDataHandler): _hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim", "target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", - "window_lead_time", "interpolation_limit", "interpolation_method", "variables"] + "window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"] def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE, network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, - window_lead_time=DEFAULT_WINDOW_LEAD_TIME, + window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME, interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT, interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, @@ -99,6 +97,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.window_dim = window_dim self.window_history_size = window_history_size self.window_history_offset = window_history_offset + self.window_history_end = window_history_end self.window_lead_time = window_lead_time self.interpolation_limit = interpolation_limit @@ -316,7 +315,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.target_data = targets def make_samples(self): - self.make_history_window(self.target_dim, self.window_history_size, self.time_dim) + self.make_history_window(self.target_dim, self.window_history_size, self.time_dim) #todo stopped here self.make_labels(self.target_dim, self.target_var, self.time_dim, self.window_lead_time) self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) @@ -558,7 +557,8 @@ class DataHandlerSingleStation(AbstractDataHandler): """ window = -abs(window) data = self.input_data - self.history = self.shift(data, dim_name_of_shift, window, offset=self.window_history_offset) + offset = self.window_history_offset + self.window_history_end + self.history = self.shift(data, dim_name_of_shift, window, offset=offset) def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str, window: int) -> None: diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 95d9f66d9c36da4c5f75fa079e2b3d2675108c64..448db5929e1bc71fa18e1a6ed32fb41ea3142f1e 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -333,12 +333,13 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation parameter exceeds the `extend_length_opts`, it has also an influence on the filter component calculation as it will return also climatological estimates. """ + DEFAULT_EXTEND_LENGTH_OPTS = 0 _hash = DataHandlerFirFilterSingleStation._hash + ["apriori_type", "apriori_sel_opts", "apriori_diurnal", "extend_length_opts"] _store_attributes = DataHandlerFirFilterSingleStation.store_attributes() + ["apriori"] def __init__(self, *args, apriori=None, apriori_type=None, apriori_diurnal=False, apriori_sel_opts=None, - name_affix=None, extend_length_opts=None, **kwargs): + name_affix=None, extend_length_opts=DEFAULT_EXTEND_LENGTH_OPTS, **kwargs): self.apriori_type = apriori_type self.climate_filter_coeff = None # coefficents of the used FIR filter self.apriori = apriori # exogenous apriori information or None to calculate from data (endogenous) @@ -362,7 +363,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation plot_path=self.plot_path, minimum_length=self.window_history_size, new_dim=self.window_dim, station_name=self.station[0], extend_length_opts=self.extend_length_opts, - offset=self.window_history_offset) + offset=self.window_history_end) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori @@ -372,19 +373,9 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.apriori = climate_filter.initial_apriori_data self.all_apriori = climate_filter.apriori_data - if isinstance(self.window_history_offset, int): - climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size + self.window_history_offset, - self.window_history_offset)}) - for c in climate_filter.filtered_data] - else: - climate_filter_data = [] - for c in climate_filter.filtered_data: - coll_tmp = [] - for v in c.coords[self.target_dim].values: - delta_lim = self.window_history_offset.get(v, 0) - coll_tmp.append(c.sel({self.target_dim: v, - self.window_dim: slice(-self.window_history_size + delta_lim, delta_lim)})) - climate_filter_data.append(xr.concat(coll_tmp, self.target_dim)) + climate_filter_data = [c.sel({self.window_dim: slice(self.window_history_end-self.window_history_size, + self.window_history_end)}) + for c in climate_filter.filtered_data] # create input data with filter index input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(add_unfiltered_index=False), @@ -452,14 +443,19 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation expression :param dim_name_of_shift: Dimension along shift will be applied """ - self.history = self.input_data.__deepcopy__() - # #todo stopped here. maybe there is nothing to do anymore? - # data = self.input_data #todo trim data to be only in range slice(None, self.window_history_offset) ?? - # sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) - # data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, - # sampling) - # data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset - # self.history = data + # self.history = self.input_data.__deepcopy__() + #todo stopped here. maybe there is nothing to do anymore? + data = self.input_data #todo trim data to be only in range slice(None, self.window_history_offset) ?? + sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) + data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, + sampling) + data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset + self.history = data + # from matplotlib import pyplot as plt + # d = self.load_and_interpolate(0) + # data.sel(datetime="2007-07-07 00:00").sum("filter").plot() + # plt.plot(data.sel(datetime="2007-07-07 00:00").sum("filter").window.values, d.sel(datetime=slice("2007-07-05 00:00", "2007-07-07 16:00")).values.flatten()) + # plt.plot(data.sel(datetime="2007-07-07 00:00").sum("filter").window.values, d.sel(datetime=slice("2007-07-05 00:00", "2007-07-11 16:00")).values.flatten()) def call_transform(self, inverse=False): opts_input = self._transformation[0] diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 0cf0efac691ef508b05b7c8c2a65cbe4c785fae1..3a67accdcd5d7868bb0c8c2c8ec68a63b90025ff 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -168,7 +168,7 @@ class ClimateFIRFilter(FIRFilter): def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, - minimum_length=None, new_dim=None, station_name=None, extend_length_opts: Union[dict, int] = None, + minimum_length=None, new_dim=None, station_name=None, extend_length_opts: int = 0, offset: Union[dict, int] = 0): """ :param data: data to filter @@ -234,15 +234,15 @@ class ClimateFIRFilter(FIRFilter): for i in range(len(self.order)): logging.info(f"{self.station_name}: start filter for order {self.order[i]}") # calculate climatological filter - _minimum_length = self._minimum_length(self.order, self.minimum_length, i, self.window) - fi, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i], + next_order = self._next_order(self.order, 0, i, self.window) + fi, input_data, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i], apriori=apriori_list[i], sel_opts=self.sel_opts, sampling=sampling, time_dim=self.time_dim, window=self.window, var_dim=self.var_dim, - minimum_length=_minimum_length, new_dim=new_dim, + minimum_length=self.minimum_length, new_dim=new_dim, plot_dates=plot_dates, station_name=self.station_name, - extend_length_opts=self.extend_length_opts, - offset=self.offset) + extend_opts=self.extend_length_opts, + offset=self.offset, next_order=next_order) logging.info(f"{self.station_name}: finished clim_filter calculation") # if self.minimum_length is None: #todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) @@ -252,11 +252,11 @@ class ClimateFIRFilter(FIRFilter): if self.minimum_length is None: # todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) filtered.append(fi.sel({new_dim: slice(None, self.offset)})) else: - filtered.append(fi.sel({new_dim: slice(-self.minimum_length, self.offset)})) + filtered.append(fi.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) h.append(hi) gc.collect() self.plot_data.append(plot_data) - plot_dates = {e["t0"] for e in plot_data} + plot_dates = {e["viz_date"] for e in plot_data} # calculate residuum logging.info(f"{self.station_name}: calculate residuum") @@ -293,7 +293,7 @@ class ClimateFIRFilter(FIRFilter): if self.minimum_length is None: # todo does slice(None,None) work? if yes this could be changed to slice(-self.minimum_length, self.offset) filtered.append(input_data.sel({new_dim: slice(None, self.offset)})) else: - filtered.append(input_data.sel({new_dim: slice(-self.minimum_length, self.offset)})) + filtered.append(input_data.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) self._filtered = filtered self._h = h @@ -307,7 +307,7 @@ class ClimateFIRFilter(FIRFilter): logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") @staticmethod - def _minimum_length(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: + def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: next_order = 0 if pos + 1 < len(order): next_order = order[pos + 1] @@ -315,7 +315,7 @@ class ClimateFIRFilter(FIRFilter): next_order = filter_width_kzf(*next_order) if minimum_length is not None: next_order = next_order + minimum_length - return next_order if next_order > 0 else None + return next_order @staticmethod def create_monthly_unity_array(data: xr.DataArray, time_dim: str, extend_range: int = 366) -> xr.DataArray: @@ -550,38 +550,42 @@ class ClimateFIRFilter(FIRFilter): interval and start of apriori interval are shifted by this value from t0 (positive or negative). :returns: combined data array """ - # check if shift indicated by extend_length_seperator is inside the outer interval limits - # assert (extend_length_separator > -extend_length_history) and (extend_length_separator < extend_length_future) - # prepare historical data / observation + ext_sep = min(extend_length_separator, extend_length_future) if new_dim not in data.coords: - history = self._shift_data(data, range(int(-extend_length_history), extend_length_separator + 1), + history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1), time_dim, new_dim) else: - history = data.sel({new_dim: slice(int(-extend_length_history), extend_length_separator)}) - # prepare climatological statistics - if new_dim not in apriori.coords: - future = self._shift_data(apriori, range(extend_length_separator + 1, - extend_length_separator + extend_length_future), - time_dim, new_dim) + history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)}) + + if extend_length_future > ext_sep + 1: + # prepare climatological statistics + if new_dim not in apriori.coords: + future = self._shift_data(apriori, range(ext_sep + 1, + extend_length_future + 1), + time_dim, new_dim) + else: + future = apriori.sel({new_dim: slice(ext_sep + 1, + extend_length_future)}) + # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + return filter_input_data else: - future = apriori.sel({new_dim: slice(extend_length_separator + 1, - extend_length_separator + extend_length_future)}) - - # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - return filter_input_data + return history def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, minimum_length, h, - variable_name, extend_length_opts=None): # pragma: no cover + variable_name, extend_length_opts=None, offset=None): # pragma: no cover plot_data = [] + offset = 0 if offset is None else offset extend_length_opts = 0 if extend_length_opts is None else extend_length_opts for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): try: td_type = {"1d": "D", "1H": "h"}.get(sampling) + # t0 = viz_date + np.timedelta64(int(offset), td_type) + t0 = viz_date # offset should be irrelevant for visualization t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) - t_plus = viz_date + np.timedelta64(int(extend_length_future + extend_length_opts), td_type) + t_plus = t0 + np.timedelta64(int(extend_length_future + extend_length_opts), td_type) if new_dim not in data.coords: tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}), range(int(-extend_length_history), @@ -590,9 +594,10 @@ class ClimateFIRFilter(FIRFilter): new_dim).sel({time_dim: viz_date}) else: tmp_filter_data = None - valid_range = range(int((len(h) + 1) / 2) if minimum_length is None else minimum_length, + valid_range = range(-int((len(h) + 1) / 2) if minimum_length is None else -minimum_length, extend_length_opts + 1) - plot_data.append({"t0": viz_date, + plot_data.append({"t0": t0, + "viz_date": viz_date, "var": variable_name, "filter_input": filter_input_data.sel({time_dim: viz_date}), "filter_input_nc": tmp_filter_data, @@ -601,7 +606,8 @@ class ClimateFIRFilter(FIRFilter): {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ time_dim].values, "h": h, - "new_dim": new_dim}) + "new_dim": new_dim, + "offset": offset}) except: pass return plot_data @@ -640,7 +646,8 @@ class ClimateFIRFilter(FIRFilter): @staticmethod def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, - minimum_length: int = None, extend_length_future: int = 0) -> xr.DataArray: + minimum_length: int = None, extend_length_future: int = 0, + offset: int = 0) -> xr.DataArray: """ Trim data along given axis between either -minimum_length (if given) or -extend_length_history and extend_length_opts (which is default set to 0). @@ -656,7 +663,7 @@ class ClimateFIRFilter(FIRFilter): if minimum_length is None: return data.sel({dim: slice(-extend_length_history, extend_length_future)}, drop=True) else: - return data.sel({dim: slice(-minimum_length, extend_length_future)}, drop=True) + return data.sel({dim: slice(-minimum_length + offset, extend_length_future)}, drop=True) @staticmethod def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, @@ -680,14 +687,10 @@ class ClimateFIRFilter(FIRFilter): @TimeTrackingWrapper def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", - minimum_length=None, new_dim="window", plot_dates=None, station_name=None, - extend_length_opts: Union[dict, int] = None, offset: Union[dict, int] = None): + minimum_length=0, next_order=0, new_dim="window", plot_dates=None, station_name=None, + extend_opts: int = 0, offset: int = 0): logging.debug(f"{station_name}: extend apriori") - # set data horizon - offset = offset if offset is not None else {} - # if extent_length_opts is not given, use same as offset - extend_opts = extend_length_opts if extend_length_opts is not None else offset # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: @@ -699,9 +702,9 @@ class ClimateFIRFilter(FIRFilter): h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) length = len(h) - # use filter length if no minimum is given, otherwise use minimum + half filter length for extension - extend_length_history = length if minimum_length is None else minimum_length + int((length + 1) / 2) - extend_length_future = int((length + 1) / 2) + 1 + # set data extend that is required for filtering + extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - offset + extend_length_future = offset + int((next_order + 1) / 2) + int((length + 1) / 2) # collect some data for visualization plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs @@ -711,21 +714,20 @@ class ClimateFIRFilter(FIRFilter): plot_data = [] coll = [] + coll_input = [] for var in reversed(data.coords[var_dim].values): logging.info(f"{station_name} ({var}): sel data") _start, _end = self._get_year_interval(data, time_dim) - offset_var = offset.get(var, 0) if isinstance(offset, dict) else offset - extend_opts_var = extend_opts.get(var, 0) if isinstance(extend_opts, dict) else extend_opts filt_coll = [] + filt_input_coll = [] for _year in range(_start, _end + 1): logging.debug(f"{station_name} ({var}): year={_year}") # select observations and apriori data time_slice = self._create_time_range_extend( - _year, sampling, max(extend_length_history, extend_length_future + max(extend_opts_var, - offset_var))) + _year, sampling, max(extend_length_history, extend_length_future)) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) if len(d.coords[time_dim]) == 0: # no data at all for this year @@ -733,7 +735,7 @@ class ClimateFIRFilter(FIRFilter): # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, - extend_length_future + offset_var, extend_length_separator=extend_opts_var) + extend_length_future, extend_length_separator=extend_opts) # select only data for current year try: @@ -751,26 +753,35 @@ class ClimateFIRFilter(FIRFilter): vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) # trim data if required - trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, minimum_length, - extend_length_future=max(extend_opts_var, offset_var)) + valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1 + trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, minimum_length + int((next_order + 1) / 2), + extend_length_future=min(extend_length_future, valid_range_end), + offset=offset) filt_coll.append(trimmed) + trimmed = self._trim_data_to_minimum_length(filter_input_data, extend_length_history, new_dim, minimum_length + int((next_order + 1) / 2), + extend_length_future=min(extend_length_future, valid_range_end), + offset=offset) + filt_input_coll.append(trimmed) # visualization plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, - minimum_length, h, var, extend_opts_var)) #todo not updated with offset_var + minimum_length, h, var, extend_opts, 0)) # collect all filter results coll.append(xr.concat(filt_coll, time_dim)) + coll_input.append(xr.concat(filt_input_coll, time_dim)) gc.collect() # concat all variables logging.debug(f"{station_name}: concat all variables") res = xr.concat(coll, var_dim) + res_input = xr.concat(coll_input, var_dim) # create result array with same shape like input data, gaps are filled by nans res_full = self._create_full_filter_result_array(data, res, new_dim, station_name) - return res_full, h, apriori, plot_data + res_input_full = self._create_full_filter_result_array(data, res_input, new_dim, station_name) + return res_full, res_input_full, h, apriori, plot_data @staticmethod def _create_time_range_extend(year: int, sampling: str, extend_length: int) -> slice: diff --git a/mlair/plotting/data_insight_plotting.py b/mlair/plotting/data_insight_plotting.py index 1eee96623d4fed6fcfb23fd1438a954a4aca230f..b8a31fa1a20c9127e9666119d7fc8d32872eb3bb 100644 --- a/mlair/plotting/data_insight_plotting.py +++ b/mlair/plotting/data_insight_plotting.py @@ -959,6 +959,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover "t0": {"color": "lightgrey", "lw": 6, "label": "$t_0$"} } + self.variables_list = [] plot_folder = os.path.join(os.path.abspath(plot_folder), "climFIR") self.fir_filter_convolve = fir_filter_convolve super().__init__(plot_folder, plot_name=None, rc_params=rc_params) @@ -976,36 +977,42 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover for p_d in plot_data: var = p_d.get("var") t0 = p_d.get("t0") + viz_date = p_d.get("viz_date") filter_input = p_d.get("filter_input") filter_input_nc = p_d.get("filter_input_nc") valid_range = p_d.get("valid_range") time_range = p_d.get("time_range") + offset = p_d.get("offset") if new_dim is None: new_dim = p_d.get("new_dim") else: assert new_dim == p_d.get("new_dim") h = p_d.get("h") plot_dict_var = plot_dict.get(var, {}) - plot_dict_t0 = plot_dict_var.get(t0, {}) + plot_dict_t0 = plot_dict_var.get(viz_date, {}) plot_dict_order = {"filter_input": filter_input, "filter_input_nc": filter_input_nc, "valid_range": valid_range, "time_range": time_range, - "order": len(h), "h": h} + "order": len(h), "h": h, + "t0": t0, + "offset": offset} plot_dict_t0[i] = plot_dict_order - plot_dict_var[t0] = plot_dict_t0 + plot_dict_var[viz_date] = plot_dict_t0 plot_dict[var] = plot_dict_var + self.variables_list = list(plot_dict.keys()) return plot_dict, new_dim def _plot(self, plot_dict, sampling, new_dim="window"): td_type = {"1d": "D", "1H": "h"}.get(sampling) for var, viz_date_dict in plot_dict.items(): - for it0, t0 in enumerate(viz_date_dict.keys()): - viz_data = viz_date_dict[t0] + for iviz_date, viz_date in enumerate(viz_date_dict.keys()): + viz_data = viz_date_dict[viz_date] residuum_true = None try: for ifilter in sorted(viz_data.keys()): data = viz_data[ifilter] + t0 = data["t0"] filter_input = data["filter_input"] filter_input_nc = data["filter_input_nc"] if residuum_true is None else residuum_true.sel( {new_dim: filter_input.coords[new_dim]}) @@ -1013,17 +1020,18 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover time_axis = data["time_range"] filter_order = data["order"] h = data["h"] + offset = data["offset"] fig, ax = plt.subplots() # plot backgrounds - self._plot_valid_area(ax, t0, valid_range, td_type) + self._plot_valid_area(ax, viz_date, valid_range, td_type) self._plot_t0(ax, t0) # original data self._plot_original_data(ax, time_axis, filter_input_nc) # clim apriori - self._plot_apriori(ax, time_axis, filter_input, new_dim, ifilter) + self._plot_apriori(ax, time_axis, filter_input, new_dim, ifilter, offset=1) # clim filter response residuum_estimated = self._plot_clim_filter(ax, time_axis, filter_input, new_dim, h, @@ -1040,12 +1048,12 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover plt.legend() fig.autofmt_xdate() plt.tight_layout() - self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}" + self.plot_name = f"climFIR_{self._name}_{str(var)}_{iviz_date}_{ifilter}" self._save() # plot residuum fig, ax = plt.subplots() - self._plot_valid_area(ax, t0, valid_range, td_type) + self._plot_valid_area(ax, viz_date, valid_range, td_type) self._plot_t0(ax, t0) self._plot_series(ax, time_axis, residuum_true.values.flatten(), style="ideal") self._plot_series(ax, time_axis, residuum_estimated.values.flatten(), style="clim") @@ -1055,7 +1063,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover fig.autofmt_xdate() plt.tight_layout() - self.plot_name = f"climFIR_{self._name}_{str(var)}_{it0}_{ifilter}_residuum" + self.plot_name = f"climFIR_{self._name}_{str(var)}_{iviz_date}_{ifilter}_residuum" self._save() except Exception as e: logging.info(f"Could not create plot because of:\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") @@ -1068,8 +1076,8 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover Use order and valid_range to find a good zoom in that hides edges of filter values that are effected by reduced filter order. Limits are returned to be usable for other plots. """ - t_minus_delta = max(1.5 * valid_range.start, 0.3 * order) - t_plus_delta = max(0.5 * valid_range.start, 0.3 * order) + t_minus_delta = max(1.5 * (valid_range.stop - valid_range.start), 0.3 * order) + t_plus_delta = max(0.5 * (valid_range.stop - valid_range.start), 0.3 * order) t_minus = t0 + np.timedelta64(-int(t_minus_delta), td_type) t_plus = t0 + np.timedelta64(int(t_plus_delta), td_type) ax_start = max(t_minus, time_axis[0]) @@ -1078,7 +1086,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover return ax_start, ax_end def _plot_valid_area(self, ax, t0, valid_range, td_type): - ax.axvspan(t0 + np.timedelta64(-valid_range.start, td_type), + ax.axvspan(t0 + np.timedelta64(valid_range.start, td_type), t0 + np.timedelta64(valid_range.stop - 1, td_type), **self.style_dict["valid_area"]) def _plot_t0(self, ax, t0): @@ -1094,12 +1102,12 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover # self._plot_series(ax, time_axis, filter_input_nc.values.flatten(), color="darkgrey", linestyle="dashed", # label="original") - def _plot_apriori(self, ax, time_axis, data, new_dim, ifilter): + def _plot_apriori(self, ax, time_axis, data, new_dim, ifilter, offset): # clim apriori filter_input = data if ifilter == 0: d_tmp = filter_input.sel( - {new_dim: slice(0, filter_input.coords[new_dim].values.max())}).values.flatten() + {new_dim: slice(offset, filter_input.coords[new_dim].values.max())}).values.flatten() else: d_tmp = filter_input.values.flatten() self._plot_series(ax, time_axis[len(time_axis) - len(d_tmp):], d_tmp, style="apriori") @@ -1138,7 +1146,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover def _store_plot_data(self, data): """Store plot data. Could be loaded in a notebook to redraw.""" - file = os.path.join(self.plot_folder, "plot_data.pickle") + file = os.path.join(self.plot_folder, "_".join(self.variables_list) + "plot_data.pickle") with open(file, "wb") as f: dill.dump(data, f)