diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 36127063d21186873523a3f86b84e25038bfd99d..6c180191cb6325347ace8fa2ec3a0b09c1e7750f 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -469,6 +469,43 @@ class ClimateFIRFilter: h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) return h + @staticmethod + def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, + minimum_length: int = None) -> xr.DataArray: + """ + Trim data along given axis between either -minimum_length (if given) or -extend_length_history and 0. + + :param data: data to trim + :param extend_length_history: start number for trim range (transformed to negative), only used if parameter + minimum_length is not provided + :param dim: dim to apply trim on + :param minimum_length: start number for trim range (transformed to negative), preferably used (default None) + :returns: trimmed data + """ + if minimum_length is None: + return data.sel({dim: slice(-extend_length_history, 0)}, drop=True) + else: + return data.sel({dim: slice(-minimum_length, 0)}, drop=True) + + @staticmethod + def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, + station_name: str = None) -> xr.DataArray: + """ + Create result filter array with same shape line given template data (should be the original input data before + filtering the data). All gaps are filled by nans. + + :param template_array: this array is used as template for shape and ordering of dims + :param result_array: array with data that are filled into template + :param new_dim: new dimension which is shifted/appended to/at the end (if present or not) + :param station_name: string that is attached to logging (default None) + """ + logging.debug(f"{station_name}: create res_full") + new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim}, + new_dim: result_array.coords[new_dim]} + dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims + result_array = result_array.transpose(*dims) + return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) + @TimeTrackingWrapper def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", @@ -534,10 +571,8 @@ class ClimateFIRFilter: vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) # trim data if required - if minimum_length is None: - filt_coll.append(filt.sel({new_dim: slice(-extend_length_history, 0)}, drop=True)) - else: - filt_coll.append(filt.sel({new_dim: slice(-minimum_length, 0)}, drop=True)) + trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, minimum_length) + filt_coll.append(trimmed) # visualization plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, @@ -553,11 +588,7 @@ class ClimateFIRFilter: res = xr.concat(coll, var_dim) # create result array with same shape like input data, gaps are filled by nans - logging.debug(f"{station_name}: create res_full") - new_coords = {**{k: data.coords[k].values for k in data.coords if k != new_dim}, new_dim: res.coords[new_dim]} - dims = [*data.dims, new_dim] if new_dim not in data.dims else data.dims - res = res.transpose(*dims) - res_full = res.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) + res_full = self._create_full_filter_result_array(data, res, new_dim, station_name) return res_full, h, apriori, plot_data @staticmethod @@ -899,7 +930,7 @@ def firwin_kzf(m: int, k: int) -> np.array: return coef / (m ** k) -def omega_null_kzf(m, k, alpha=0.5): +def omega_null_kzf(m: int, k: int, alpha: float = 0.5) -> float: a = np.sqrt(6) / np.pi b = 1 / (2 * np.array(k)) c = 1 - alpha ** b diff --git a/test/test_helpers/test_filter.py b/test/test_helpers/test_filter.py index aecdd04dcd6a0542c7cf4989eec002e6920d8c99..7374ecae434a4f2b5a3e9f09df47d1a14b9a0f8a 100644 --- a/test/test_helpers/test_filter.py +++ b/test/test_helpers/test_filter.py @@ -5,19 +5,20 @@ import pytest import inspect import numpy as np import xarray as xr +import pandas as pd -from mlair.helpers.filter import ClimateFIRFilter, filter_width_kzf, firwin_kzf +from mlair.helpers.filter import ClimateFIRFilter, filter_width_kzf, firwin_kzf, omega_null_kzf, fir_filter_convolve class TestClimateFIRFilter: @pytest.fixture - def time_dim(self): - return "datetime" + def var_dim(self): + return "variables" @pytest.fixture - def new_dim(self): - return "history" + def time_dim(self): + return "datetime" @pytest.fixture def data(self): @@ -40,6 +41,20 @@ class TestClimateFIRFilter: coords={time_dim: time_index, "station": ["DE266X"]}) return array + @pytest.fixture + def xr_array_long_with_var(self, data, time_dim, var_dim): + start = np.datetime64("2010-01-01 00:00") + time_index = [start + np.timedelta64(175 * h, "h") for h in range(len(data))] + array = xr.DataArray(data.reshape(*data.shape, 1), dims=[time_dim, "station"], + coords={time_dim: time_index, "station": ["DE266X"]}) + array = array.resample({time_dim: "1H"}).interpolate() + new_data = xr.concat([array, + array + np.sin(np.arange(array.shape[0]) * 2 * np.pi / 24).reshape(*array.shape), + array + np.random.random(size=array.shape), + array * np.random.random(size=array.shape)], + dim=pd.Index(["o3", "temp", "wind", "sun"], name=var_dim)) + return new_data + def test_combine_observation_and_apriori_no_new_dim(self, xr_array, time_dim): obj = object.__new__(ClimateFIRFilter) apriori = xr.ones_like(xr_array) @@ -276,6 +291,82 @@ class TestClimateFIRFilter: assert obj.apriori_data == [10, 11, 12, 13] assert obj.initial_apriori_data == 10 + def test_trim_data_to_minimum_length(self, xr_array, time_dim): + obj = object.__new__(ClimateFIRFilter) + xr_array = obj._shift_data(xr_array, range(-20, 1), time_dim, new_dim="window") + res = obj._trim_data_to_minimum_length(xr_array, 5, "window") + assert xr_array.shape == (21, 100, 1) + assert res.shape == (6, 100, 1) + res = obj._trim_data_to_minimum_length(xr_array, 5, "window", 10) + assert res.shape == (11, 100, 1) + res = obj._trim_data_to_minimum_length(xr_array, 30, "window") + assert res.shape == (21, 100, 1) + + def test_create_full_filter_result_array(self, xr_array, time_dim): + obj = object.__new__(ClimateFIRFilter) + xr_array_window = obj._shift_data(xr_array, range(-10, 1), time_dim, new_dim="window").dropna(time_dim) + res = obj._create_full_filter_result_array(xr_array, xr_array_window, "window") + assert res.dims == (*xr_array.dims, "window") + assert res.shape == (*xr_array.shape, 11) + res2 = obj._create_full_filter_result_array(res, xr_array_window, "window") + assert res.dims == res2.dims + assert res.shape == res2.shape + + def test_clim_filter(self, xr_array_long_with_var, time_dim, var_dim): + obj = object.__new__(ClimateFIRFilter) + filter_order = 10*24+1 + res = obj.clim_filter(xr_array_long_with_var, 24, 0.05, 10*24+1, sampling="1H", time_dim=time_dim, var_dim=var_dim) + assert len(res) == 4 + + # check filter data properties + assert res[0].shape == (*xr_array_long_with_var.shape, filter_order + 1) + assert res[0].dims == (*xr_array_long_with_var.dims, "window") + + # check filter properties + assert np.testing.assert_almost_equal( + res[1], obj._calculate_filter_coefficients("hamming", filter_order, 0.05, 24)) is None + + # check apriori + apriori = obj.create_monthly_mean(xr_array_long_with_var, time_dim, sampling="1H") + apriori = apriori.astype(xr_array_long_with_var.dtype) + apriori = obj.extend_apriori(xr_array_long_with_var, apriori, time_dim, "1H") + assert xr.testing.assert_equal(res[2], apriori) is None + + # check plot data format + assert isinstance(res[3], list) + assert isinstance(res[3][0], dict) + keys = {"t0", "var", "filter_input", "filter_input_nc", "valid_range", "time_range", "h", "new_dim"} + assert len(keys.symmetric_difference(res[3][0].keys())) == 0 + + def test_clim_filter_kwargs(self, xr_array_long_with_var, time_dim, var_dim): + obj = object.__new__(ClimateFIRFilter) + filter_order = 10 * 24 + 1 + apriori = obj.create_seasonal_hourly_mean(xr_array_long_with_var, time_dim, sampling="1H", as_anomaly=False) + apriori = apriori.astype(xr_array_long_with_var.dtype) + apriori = obj.extend_apriori(xr_array_long_with_var, apriori, time_dim, "1H") + plot_dates = [xr_array_long_with_var.coords[time_dim][1800].values] + res = obj.clim_filter(xr_array_long_with_var, 24, 0.05, 10 * 24 + 1, sampling="1H", time_dim=time_dim, + var_dim=var_dim, new_dim="total_new_dim", window=("kaiser", 5), minimum_length=1000, + apriori=apriori, plot_dates=plot_dates) + + assert res[0].shape == (*xr_array_long_with_var.shape, 1000 + 1) + assert res[0].dims == (*xr_array_long_with_var.dims, "total_new_dim") + assert np.testing.assert_almost_equal( + res[1], obj._calculate_filter_coefficients(("kaiser", 5), filter_order, 0.05, 24)) is None + assert xr.testing.assert_equal(res[2], apriori) is None + assert len(res[3]) == len(res[0].coords[var_dim]) + + +class TestFirFilterConvolve: + + def test_fir_filter_convolve(self): + data = np.cos(np.linspace(0, 4, num=100) * np.pi) + obj = object.__new__(ClimateFIRFilter) + h = obj._calculate_filter_coefficients("hamming", 21, 0.25, 1) + res = fir_filter_convolve(data, h) + assert res.shape == (100,) + assert np.testing.assert_almost_equal(np.dot(data[40:61], h) / sum(h), res[50]) is None + class TestFirwinKzf: @@ -293,3 +384,20 @@ class TestFilterWidthKzf: assert filter_width_kzf(3, 5) == 11 +class TestOmegaNullKzf: + + def test_omega_null_kzf(self): + assert np.testing.assert_almost_equal(omega_null_kzf(13, 3), 0.01986, decimal=5) is None + assert np.testing.assert_almost_equal(omega_null_kzf(105, 5), 0.00192, decimal=5) is None + assert np.testing.assert_almost_equal(omega_null_kzf(3, 5), 0.07103, decimal=5) is None + + def test_omega_null_kzf_alpha(self): + assert np.testing.assert_almost_equal(omega_null_kzf(3, 3, alpha=1), 0, decimal=1) is None + assert np.testing.assert_almost_equal(omega_null_kzf(3, 3, alpha=0), 0.25989, decimal=5) is None + assert np.testing.assert_almost_equal(omega_null_kzf(3, 3), omega_null_kzf(3, 3, alpha=0.5), decimal=5) is None + + + + + +