diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index d9226b7c52dcd7187916ac34c25e20360d0a0e0b..c289df3b6e3ef89688267a2c3e0d067a64a113d2 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -531,69 +531,89 @@ class ClimateFIRFilter: for var in reversed(data.coords[var_dim].values): # self._tmp_analysis(data, apriori, var, var_dim, length, time_dim, new_dim, h) logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") - d = data.sel({var_dim: [var]}) - a = apriori.sel({var_dim: [var]}) - - # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] - logging.info(f"{data.coords['Stations'].values[0]} ({var}): history") - extend_length = length if minimum_length is None else max(length, minimum_length + int((length + 1) / 2)) - if new_dim not in d.coords: - history = self._shift_data(d, range(int(-extend_length), 1), time_dim, var_dim, new_dim) - gc.collect() - else: - history = d.sel({new_dim: slice(int(-extend_length), 0)}) - logging.info(f"{data.coords['Stations'].values[0]} ({var}): future") - diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) - if new_dim not in a.coords: - future = self._shift_data(a, range(1, int(extend_length + 1)), time_dim, var_dim, new_dim) - # future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - diff - gc.collect() - else: - future = a.sel({new_dim: slice(1, int(extend_length + 1))}) - logging.info(f"{data.coords['Stations'].values[0]} ({var}): concat to filter input") - - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - # filter_input_data = history.combine_first(future) - # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() - # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() - # ToDo: remove all other filt methods, only keep the convolve one - time_axis = filter_input_data.coords[time_dim] - # apply vectorized fir filter along the tmp dimension - kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, - "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} - # with TimeTracking(name="numpy_vec"): - # filt = fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs) - # with TimeTracking(name="xr_apply_ufunc"): - # filt = xr.apply_ufunc(fir_filter_vectorized, filter_input_data, time_axis, - # input_core_dims=[[new_dim], []], output_core_dims=[[new_dim]], vectorize=True, - # kwargs=kwargs) - logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name="convolve"): - # slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, # .sel({new_dim: slicer}), - input_core_dims=[[new_dim]], - output_core_dims=[[new_dim]], - vectorize=True, - kwargs={"h": h}) + + _start = pd.to_datetime(data.coords[time_dim].min().values).year + _end = pd.to_datetime(data.coords[time_dim].max().values).year + filt_coll = [] + for _year in range(_start, _end + 1): + logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") + extend_length = length if minimum_length is None else max(length, + minimum_length + int((length + 1) / 2)) + + time_slice = self._create_time_range_extend(_year, sampling, extend_length) + d = data.sel({var_dim: [var], time_dim: time_slice}) + a = apriori.sel({var_dim: [var], time_dim: time_slice}) + if len(d.coords[time_dim]) == 0: # no data at all for this year + continue + + # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] + logging.info(f"{data.coords['Stations'].values[0]} ({var}): history") + if new_dim not in d.coords: + history = self._shift_data(d, range(int(-extend_length), 1), time_dim, var_dim, new_dim) + gc.collect() + else: + history = d.sel({new_dim: slice(int(-extend_length), 0)}) + logging.info(f"{data.coords['Stations'].values[0]} ({var}): future") + diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) + if new_dim not in a.coords: + future = self._shift_data(a, range(1, int(extend_length + 1)), time_dim, var_dim, new_dim) + # future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - diff + gc.collect() + else: + future = a.sel({new_dim: slice(1, int(extend_length + 1))}) + logging.info(f"{data.coords['Stations'].values[0]} ({var}): concat to filter input") + + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + try: + filter_input_data = filter_input_data.sel({time_dim: str(_year)}) + except KeyError: # no valid data for this year + continue + if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year + continue + # filter_input_data = history.combine_first(future) + # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() + # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() + # ToDo: remove all other filt methods, only keep the convolve one + time_axis = filter_input_data.coords[time_dim] + # apply vectorized fir filter along the tmp dimension + kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, + "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} + # with TimeTracking(name="numpy_vec"): + # filt = fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs) + # with TimeTracking(name="xr_apply_ufunc"): + # filt = xr.apply_ufunc(fir_filter_vectorized, filter_input_data, time_axis, + # input_core_dims=[[new_dim], []], output_core_dims=[[new_dim]], vectorize=True, + # kwargs=kwargs) + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + # slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) + filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, # .sel({new_dim: slicer}), + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + filt_coll.append(filt.sel({new_dim: slice(-extend_length, 0)}, drop=True)) # plot # ToDo: enable plotting again - if self.plot_path is not None: - for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days - try: - pos = int(time_pos * 365 * fs) - filter_example = filter_input_data.isel({time_dim: pos}) - t0 = filter_example.coords[time_dim].values - t_slice = filter_input_data.isel( - {time_dim: slice(pos - int((length - 1) / 2), pos + int((length - 1) / 2) + 1)}).coords[ - time_dim].values - # self.plot(d, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") - except IndexError: - pass + # if self.plot_path is not None: + # for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days + # try: + # pos = int(time_pos * 365 * fs) + # filter_example = filter_input_data.isel({time_dim: pos}) + # t0 = filter_example.coords[time_dim].values + # t_slice = filter_input_data.isel( + # {time_dim: slice(pos - int((length - 1) / 2), pos + int((length - 1) / 2) + 1)}).coords[ + # time_dim].values + # # self.plot(d, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") + # except IndexError: + # pass # select only values at tmp dimension 0 at each point in time # coll.append(filt.sel({new_dim: 0}, drop=True)) - coll.append(filt.sel({new_dim: slice(-extend_length, 0)}, drop=True)) + # coll.append(filt.sel({new_dim: slice(-extend_length, 0)}, drop=True)) + coll.append(xr.concat(filt_coll, time_dim)) gc.collect() logging.info(f"{data.coords['Stations'].values[0]}: concat all variables") @@ -607,6 +627,14 @@ class ClimateFIRFilter: res_full.loc[res.coords] = res.transpose(*dims) return res_full, h, apriori + @staticmethod + def _create_time_range_extend(year, sampling, extend_length): + td_type = {"1d": "D", "1H": "h"}.get(sampling) + delta = np.timedelta64(extend_length + 1, td_type) + start = np.datetime64(f"{year}-01-01") - delta + end = np.datetime64(f"{year}-12-31") + delta + return slice(start, end) + @staticmethod def _create_tmp_dimension(data): new_dim = "window"