diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 4707fd580562a68fd6b2dc0843551905e70d7e50..43662ff9fb53a13a1b3d93f4daa2cfae6a0ef7ae 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -436,7 +436,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation apriori_type=self.apriori_type, apriori=self.apriori, apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, plot_path=self.plot_path, plot_name=plot_name, - minimum_length=self.window_history_size, new_dim=self.window_dim) + minimum_length=self.window_history_size, new_dim=self.window_dim, + station_name=self.station) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 36c93b04486fc9be013af2c4f34d2b3ee1bd84c2..7ff7705752101a7107f5ce5a9149fa2ff9553388 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -59,7 +59,7 @@ class ClimateFIRFilter: def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, plot_name=None, - minimum_length=None, new_dim=None): + minimum_length=None, new_dim=None, station_name=None): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -87,8 +87,6 @@ class ClimateFIRFilter: sampling = {1: "1d", 24: "1H"}.get(int(fs)) logging.debug(f"{plot_name}: create diurnal_anomalies") if apriori_diurnal is True and sampling == "1H": - # diurnal_anomalies = self.create_hourly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, - # as_anomaly=True) diurnal_anomalies = self.create_seasonal_hourly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, as_anomaly=True) @@ -118,7 +116,7 @@ class ClimateFIRFilter: sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, window=window, var_dim=var_dim, minimum_length=_minimum_length, new_dim=new_dim, - plot_dates=plot_dates) + plot_dates=plot_dates, station_name=station_name) logging.info(f"{plot_name}: finished clim_filter calculation") if minimum_length is None: @@ -136,7 +134,7 @@ class ClimateFIRFilter: if new_dim in input_data.coords: input_data = input_data.sel({new_dim: coord_range}) - fi else: - input_data = self._shift_data(input_data, coord_range, time_dim, var_dim, new_dim) - fi + input_data = self._shift_data(input_data, coord_range, time_dim, new_dim) - fi # create new apriori information for next iteration if no further apriori is provided if len(apriori_list) <= i + 1: @@ -344,12 +342,88 @@ class ClimateFIRFilter: return apriori + def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str, + extend_length_history: int, extend_length_future: int) -> xr.DataArray: + """ + Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are + used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. + + :param data: historical data for past values, must contain dimensions time_dim and var_dim and might also have + a new_dim dimension + :param apriori: climatological estimate for future values, must contain dimensions time_dim and var_dim, but + can also have dimension new_dim + :param time_dim: name of temporal dimension + :param new_dim: name of new dim on which data is combined along + :param extend_length_history: number of time steps to use from data + :param extend_length_future: number of time steps to use from apriori (minus 1) + :returns: combined data array + """ + # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] + if new_dim not in data.coords: + history = self._shift_data(data, range(int(-extend_length_history), 1), time_dim, new_dim) + else: + history = data.sel({new_dim: slice(int(-extend_length_history), 0)}) + if new_dim not in apriori.coords: + future = self._shift_data(apriori, range(1, extend_length_future), time_dim, new_dim) + else: + future = apriori.sel({new_dim: slice(1, extend_length_future)}) + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + return filter_input_data + + def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, + extend_length_future, minimum_length, h, variable_name): + plot_data = [] + for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): + try: + td_type = {"1d": "D", "1H": "h"}.get(sampling) + t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) + t_plus = viz_date + np.timedelta64(int(extend_length_future), td_type) + if new_dim not in data.coords: + tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}), + range(int(-extend_length_history), + int(extend_length_future)), + time_dim, + new_dim).sel({time_dim: viz_date}) + else: + # tmp_filter_data = d.sel({time_dim: viz_date, + # new_dim: slice(int(-extend_length_history), int(extend_length_future))}) + tmp_filter_data = None + valid_range = range(int((len(h) + 1) / 2) if minimum_length is None else minimum_length, 1) + plot_data.append({"t0": viz_date, + "var": variable_name, + "filter_input": filter_input_data.sel({time_dim: viz_date}), + "filter_input_nc": tmp_filter_data, + "valid_range": valid_range, + "time_range": data.sel( + {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ + time_dim].values, + "h": h, + "new_dim": new_dim}) + except: + pass + return plot_data + + @staticmethod + def _get_year_interval(data, time_dim): + start = pd.to_datetime(data.coords[time_dim].min().values).year + end = pd.to_datetime(data.coords[time_dim].max().values).year + return start, end + + @staticmethod + def _calculate_filter_coefficients(window, order, cutoff_high, fs): + # calculate FIR filter coefficients + if window == "kzf": + h = firwin_kzf(*order) + else: + h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) + return h + @TimeTrackingWrapper def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", - minimum_length=None, new_dim="window", plot_dates=None): + minimum_length=None, new_dim="window", plot_dates=None, station_name=None): - logging.debug(f"{data.coords['Stations'].values[0]}: extend apriori") + logging.debug(f"{station_name}: extend apriori") # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: @@ -358,10 +432,7 @@ class ClimateFIRFilter: apriori = self.extend_apriori(data, apriori, time_dim, sampling) # calculate FIR filter coefficients - if window == "kzf": - h = firwin_kzf(*order) - else: - h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) + h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) length = len(h) # use filter length if no minimum is given, otherwise use minimum + half filter length for extension @@ -378,14 +449,14 @@ class ClimateFIRFilter: coll = [] for var in reversed(data.coords[var_dim].values): - logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") + logging.info(f"{station_name} ({var}): sel data") - _start = pd.to_datetime(data.coords[time_dim].min().values).year - _end = pd.to_datetime(data.coords[time_dim].max().values).year + _start, _end = self._get_year_interval(data, time_dim) filt_coll = [] for _year in range(_start, _end + 1): - logging.debug(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") + logging.debug(f"{station_name} ({var}): year={_year}") + # select observations and apriori data time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) @@ -393,15 +464,10 @@ class ClimateFIRFilter: continue # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] - if new_dim not in d.coords: - history = self._shift_data(d, range(int(-extend_length_history), 1), time_dim, var_dim, new_dim) - else: - history = d.sel({new_dim: slice(int(-extend_length_history), 0)}) - if new_dim not in a.coords: - future = self._shift_data(a, range(1, extend_length_future), time_dim, var_dim, new_dim) - else: - future = a.sel({new_dim: slice(1, extend_length_future)}) - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, + extend_length_future) + + # select only data for current year try: filter_input_data = filter_input_data.sel({time_dim: str(_year)}) except KeyError: # no valid data for this year @@ -409,65 +475,37 @@ class ClimateFIRFilter: if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year continue - logging.debug(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name=f"{data.coords['Stations'].values[0]} ({var}): filter convolve", - logging_level=logging.DEBUG): + # apply filter + logging.debug(f"{station_name} ({var}): start filter convolve") + with TimeTracking(name=f"{station_name} ({var}): filter convolve", logging_level=logging.DEBUG): filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data, - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}, - output_dtypes=[d.dtype]) + input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], + vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) + # trim data if required if minimum_length is None: filt_coll.append(filt.sel({new_dim: slice(-extend_length_history, 0)}, drop=True)) else: filt_coll.append(filt.sel({new_dim: slice(-minimum_length, 0)}, drop=True)) # visualization - for viz_date in set(plot_dates).intersection(filt.coords[time_dim].values): - try: - td_type = {"1d": "D", "1H": "h"}.get(sampling) - t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) - t_plus = viz_date + np.timedelta64(int(extend_length_future), td_type) - if new_dim not in d.coords: - tmp_filter_data = self._shift_data(d.sel({time_dim: slice(t_minus, t_plus)}), - range(int(-extend_length_history), - int(extend_length_future)), - time_dim, var_dim, new_dim).sel({time_dim: viz_date}) - else: - # tmp_filter_data = d.sel({time_dim: viz_date, - # new_dim: slice(int(-extend_length_history), int(extend_length_future))}) - tmp_filter_data = None - valid_range = range(int((length + 1) / 2) if minimum_length is None else minimum_length, 1) - plot_data.append({"t0": viz_date, - "var": var, - "filter_input": filter_input_data.sel({time_dim: viz_date}), - "filter_input_nc": tmp_filter_data, - "valid_range": valid_range, - "time_range": d.sel( - {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ - time_dim].values, - "h": h, - "new_dim": new_dim}) - except: - pass + plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, + sampling, extend_length_history, extend_length_future, + minimum_length, h, var)) # collect all filter results coll.append(xr.concat(filt_coll, time_dim)) gc.collect() - logging.debug(f"{data.coords['Stations'].values[0]}: concat all variables") + # concat all variables + logging.debug(f"{station_name}: concat all variables") res = xr.concat(coll, var_dim) - # create result array with same shape like input data, gabs are filled by nans - logging.debug(f"{data.coords['Stations'].values[0]}: create res_full") + # create result array with same shape like input data, gaps are filled by nans + logging.debug(f"{station_name}: create res_full") new_coords = {**{k: data.coords[k].values for k in data.coords if k != new_dim}, new_dim: res.coords[new_dim]} dims = [*data.dims, new_dim] if new_dim not in data.dims else data.dims res = res.transpose(*dims) - # res_full = xr.DataArray(dims=dims, coords=new_coords) - # res_full.loc[res.coords] = res - # res_full.compute() res_full = res.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) return res_full, h, apriori, plot_data @@ -490,19 +528,35 @@ class ClimateFIRFilter: raise ValueError("Could not create new dimension.") return new_dim - def _shift_data(self, data, index_value, time_dim, squeeze_dim, new_dim): + def _shift_data(self, data: xr.DataArray, index_value: range, time_dim: str, new_dim: str) -> xr.DataArray: + """ + Shift data multiple times to create history or future along dimension new_dim for each time step. + + :param data: data set to shift + :param index_value: range of integers to span history and/or future + :param time_dim: name of temporal dimension that should be shifted + :param new_dim: name of dimension create by data shift + :return: shifted data + """ coll = [] for i in index_value: coll.append(data.shift({time_dim: -i})) - new_ind = self.create_index_array(new_dim, index_value, squeeze_dim) + new_ind = self.create_index_array(new_dim, index_value) return xr.concat(coll, dim=new_ind) @staticmethod - def create_index_array(index_name: str, index_value, squeeze_dim: str): + def create_index_array(index_name: str, index_value: range): + """ + Create index array from a range object to use as index of a data array. + + :param index_name: name of the index dimension + :param index_value: range of values to use as indexes + :returns: index array for given range of values + """ ind = pd.DataFrame({'val': index_value}, index=index_value) - res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze( - dim=squeeze_dim, - drop=True) + tmp_dim = index_name + "tmp" + res = xr.Dataset.from_dataframe(ind).to_array(tmp_dim).rename({'index': index_name}) + res = res.squeeze(dim=tmp_dim, drop=True) res.name = index_name return res diff --git a/test/test_data_handler/test_data_handler_with_filter.py b/test/test_data_handler/test_data_handler_with_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..07afe80eb700fa95415ca62e1e2aceb4f73d7db5 --- /dev/null +++ b/test/test_data_handler/test_data_handler_with_filter.py @@ -0,0 +1,5 @@ +import pytest +import inspect + +from mlair.data_handler.abstract_data_handler import AbstractDataHandler + diff --git a/test/test_helpers/test_filter.py b/test/test_helpers/test_filter.py new file mode 100644 index 0000000000000000000000000000000000000000..8900c78bb79aaecd05169e7f46474dc9b21f57a4 --- /dev/null +++ b/test/test_helpers/test_filter.py @@ -0,0 +1,77 @@ +__author__ = 'Lukas Leufen' +__date__ = '2021-11-18' + +import pytest +import inspect +import numpy as np +import xarray as xr + +from mlair.helpers.filter import ClimateFIRFilter + + +class TestClimateFIRFilter: + + @pytest.fixture + def time_dim(self): + return "datetime" + + @pytest.fixture + def new_dim(self): + return "history" + + @pytest.fixture + def data(self): + pos = np.linspace(0, 4, num=100) + return np.cos(pos * np.pi) + + @pytest.fixture + def xr_array(self, data, time_dim): + start = np.datetime64("2010-01-01 00:00") + time_index = [start + np.timedelta64(d, "h") for d in range(len(data))] + array = xr.DataArray(data, dims=time_dim, coords={time_dim: time_index}) + return array + + def test_combine_observation_and_apriori_no_new_dim(self, xr_array, time_dim): + obj = object.__new__(ClimateFIRFilter) + apriori = xr.ones_like(xr_array) + res = obj.combine_observation_and_apriori(xr_array, apriori, time_dim, "window", 20, 10) + assert res.coords[time_dim].values[0] == xr_array.coords[time_dim].values[20] + first_entry = res.sel({time_dim: res.coords[time_dim].values[0]}) + assert np.testing.assert_array_equal(first_entry.sel(window=range(-20, 1)).values, xr_array.values[:21]) is None + assert np.testing.assert_array_equal(first_entry.sel(window=range(1, 10)).values, apriori.values[21:30]) is None + + def test_combine_observation_and_apriori_with_new_dim(self, xr_array, time_dim): + obj = object.__new__(ClimateFIRFilter) + apriori = xr.ones_like(xr_array) + xr_array = obj._shift_data(xr_array, range(-20, 1), time_dim, new_dim="window") + apriori = obj._shift_data(apriori, range(1, 10), time_dim, new_dim="window") + res = obj.combine_observation_and_apriori(xr_array, apriori, time_dim, "window", 10, 10) + assert res.coords[time_dim].values[0] == xr_array.coords[time_dim].values[10] + date_pos = res.coords[time_dim].values[0] + first_entry = res.sel({time_dim: date_pos}) + assert xr.testing.assert_equal(first_entry.sel(window=range(-10, 1)), + xr_array.sel({time_dim: date_pos, "window": range(-10, 1)})) is None + assert xr.testing.assert_equal(first_entry.sel(window=range(1, 10)), apriori.sel({time_dim: date_pos})) is None + + def test_shift_data(self, xr_array, time_dim): + obj = object.__new__(ClimateFIRFilter) + index_values = range(-15, 1) + res = obj._shift_data(xr_array, index_values, time_dim, new_dim="window") + assert len(res.dims) == 2 + assert res.dims == [time_dim, "window"] + assert np.testing.assert_array_equal(res.coords["window"].values, np.arange(-15, 1)) is None + sel = res.sel({time_dim: res.coords[time_dim].values[15]}) + assert sel.sel(window=-15).values == xr_array.sel({time_dim: xr_array.coords[time_dim].values[0]}).values + assert sel.sel(window=0).values == xr_array.sel({time_dim: xr_array.coords[time_dim].values[15]}).values + + def test_create_index_array(self): + obj = object.__new__(ClimateFIRFilter) + index_name = "test_index_name" + index_values = range(-10, 1) + res = obj.create_index_array(index_name, index_values) + assert len(res.dims) == 1 + assert res.dims[0] == index_name + assert res.shape == (11,) + assert np.testing.assert_array_equal(res.values, np.arange(-10, 1)) is None + + diff --git a/test/test_helpers/test_testing_helpers.py b/test/test_helpers/test_testing_helpers.py index 385161c740f386847ef2f2dc4df17c1c84fa7fa5..b247264d819571d6c55c781d17fdf50c9aec0bc7 100644 --- a/test/test_helpers/test_testing_helpers.py +++ b/test/test_helpers/test_testing_helpers.py @@ -11,7 +11,8 @@ class TestPyTestRegex: def test_init(self): test_regex = PyTestRegex(r"TestString\d+") - assert isinstance(test_regex._regex, re._pattern_type) + pattern = re._pattern_type if hasattr(re, "_pattern_type") else re.Pattern + assert isinstance(test_regex._regex, pattern) def test_eq(self): assert PyTestRegex(r"TestString\d*") == "TestString"