From 09d8ceda2a860b1ccfbad4ce8a07100ed9c56e55 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 18 May 2021 15:07:20 +0200 Subject: [PATCH] use less memory intense version for clim_filter_vectorized to avoid memory issues --- mlair/helpers/filter.py | 76 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index b77a5910..ced279cc 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -93,7 +93,8 @@ class ClimateFIRFilter: input_data = data.__deepcopy__() for i in range(len(order)): # calculate climatological filter - clim_filter: Callable = {True: self.clim_filter_vectorized, False: self.clim_filter}[vectorized] + # clim_filter: Callable = {True: self.clim_filter_vectorized, False: self.clim_filter}[vectorized] + clim_filter: Callable = {True: self.clim_filter_vectorized_less_memory, False: self.clim_filter}[vectorized] fi, hi, apriori = clim_filter(input_data, fs, cutoff[i], order[i], apriori=apriori_list[i], sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, window=window, @@ -353,6 +354,79 @@ class ClimateFIRFilter: res_full.loc[res.coords] = res return res_full, h, apriori + @TimeTrackingWrapper + def clim_filter_vectorized_less_memory(self, data, fs, cutoff_high, order, apriori=None, padlen_factor=0.5, + sel_opts=None, + sampling="1d", time_dim="datetime", var_dim="variables", window="hamming", + plot_index=None): + + # calculate apriori information from data if not given and extend its range if not sufficient long enough + if apriori is None: + apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) + apriori = self.extend_apriori(data, apriori, time_dim, sampling) + + # calculate FIR filter coefficients + h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) + length = len(h) + + # create tmp dimension to apply filter, search for unused name + new_dim = self._create_tmp_dimension(data) + + coll = [] + + for var in data.coords[var_dim].values: + d = data.sel({var_dim: [var]}) + a = apriori.sel({var_dim: [var]}) + + # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] + history = self._shift_data(d, range(int(-(length - 1) / 2), 1), time_dim, var_dim, new_dim) + future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + # filter_input_data = history.combine_first(future) + # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() + # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() + + time_axis = filter_input_data.coords[time_dim] + # apply vectorized fir filter along the tmp dimension + kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, + "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} + # with TimeTracking(name="numpy_vec"): + # filt = fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs) + # with TimeTracking(name="xr_apply_ufunc"): + # filt = xr.apply_ufunc(fir_filter_vectorized, filter_input_data, time_axis, + # input_core_dims=[[new_dim], []], output_core_dims=[[new_dim]], vectorize=True, + # kwargs=kwargs) + with TimeTracking(name="convolve"): + slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) + filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data.sel({new_dim: slicer}), + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + # plot + if self.plot_path is not None: + for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days + try: + pos = int(time_pos * 365 * fs) + filter_example = filter_input_data.isel({time_dim: pos}) + t0 = filter_example.coords[time_dim].values + t_slice = filter_input_data.isel( + {time_dim: slice(pos - int((length - 1) / 2), pos + int((length - 1) / 2) + 1)}).coords[ + time_dim].values + self.plot(d, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") + except IndexError: + pass + + # select only values at tmp dimension 0 at each point in time + coll.append(filt.sel({new_dim: 0}, drop=True)) + + res = xr.concat(coll, var_dim) + # create result array with same shape like input data, gabs are filled by nans + res_full = xr.ones_like(data) * np.nan + res_full.loc[res.coords] = res + return res_full, h, apriori + @staticmethod def _create_tmp_dimension(data): new_dim = "window" -- GitLab