diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index c33da2a9337642d891918462722d30f7fd12b772..576fe9d7cab132abc2f3a3a9bd8791280ebbe618 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -323,7 +323,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, apriori_type=self.apriori_type, apriori=self.apriori, apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, - plot_path=self.plot_path, plot_name=str(self)) + plot_path=self.plot_path, plot_name=str(self), + minimum_length=self.window_history_size, new_dim=self.window_dim) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori @@ -332,15 +333,24 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation else: self.apriori = climate_filter.initial_apriori_data self.all_apriori = climate_filter.apriori_data - climate_filter_data = climate_filter.filtered_data + + climate_filter_data = [c.sel({self.window_dim: slice(-self.window_history_size, 0)}) for c in + climate_filter.filtered_data] + # climate_filter_data = climate_filter.filtered_data + + # create input data with filter index + input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) + # self.input_data = xr.concat([c.sel(window=slice(-self.window_history_size, 0)) for c in climate_filter_data], pd.Index(self.create_filter_index(), name=self.filter_dim)) # add unfiltered raw data if self._add_unfiltered is True: - climate_filter_data.append(self.input_data) + data_raw = self.shift(self.input_data, self.time_dim, -self.window_history_size) + data_raw = data_raw.expand_dims({self.filter_dim: ["unfiltered"]}, -1) + input_data = xr.concat([input_data, data_raw], self.filter_dim) - # create input data with filter index - self.input_data = xr.concat(climate_filter_data, pd.Index(self.create_filter_index(), name=self.filter_dim)) + self.input_data = input_data + # self.history = self.shift(data, dim_name_of_shift, window, offset=self.window_history_offset) # this is just a code snippet to check the results of the filter # import matplotlib # matplotlib.use("TkAgg") @@ -397,6 +407,27 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation def _period_to_freq(cutoff_p): return [1. / x for x in cutoff_p] + def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None: + """ + Create a xr.DataArray containing history data. + + Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted + data. This is used to represent history in the data. Results are stored in history attribute. + + :param dim_name_of_inputs: Name of dimension which contains the input variables + :param window: number of time steps to look back in history + Note: window will be treated as negative value. This should be in agreement with looking back on + a time line. Nonetheless positive values are allowed but they are converted to its negative + expression + :param dim_name_of_shift: Dimension along shift will be applied + """ + data = self.input_data + sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) + data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, + sampling) + data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset + self.history = data + class DataHandlerClimateFirFilter(DefaultDataHandler): """Data handler using climatic adjusted FIR filtered data.""" diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 3cb07140d4e3c6d729ae9bf1af436b2698e33c70..d9226b7c52dcd7187916ac34c25e20360d0a0e0b 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -58,7 +58,7 @@ class ClimateFIRFilter: def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, plot_name=None, vectorized=True, - padlen_factor=0.8): + padlen_factor=0.8, minimum_length=None, new_dim=None): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -95,30 +95,45 @@ class ClimateFIRFilter: logging.info(f"{plot_name}: apriori shape = {apriori.shape}") apriori_list = to_list(apriori) input_data = data.__deepcopy__() + + # create tmp dimension to apply filter, search for unused name + new_dim = self._create_tmp_dimension(input_data) if new_dim is None else new_dim + for i in range(len(order)): logging.info(f"{plot_name}: start filter for order {order[i]}") # calculate climatological filter # clim_filter: Callable = {True: self.clim_filter_vectorized, False: self.clim_filter}[vectorized] + # ToDo: remove all methods except the vectorized version clim_filter: Callable = {True: self.clim_filter_vectorized_less_memory, False: self.clim_filter}[vectorized] + _minimum_length = self._minimum_length(order, minimum_length, i) fi, hi, apriori = clim_filter(input_data, fs, cutoff[i], order[i], apriori=apriori_list[i], sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, window=window, - var_dim=var_dim, plot_index=i, padlen_factor=padlen_factor) + var_dim=var_dim, plot_index=i, padlen_factor=padlen_factor, + minimum_length=_minimum_length, new_dim=new_dim) logging.info(f"{plot_name}: finished clim_filter calculation") - filtered.append(fi) + if minimum_length is None: + filtered.append(fi) + else: + filtered.append(fi.sel({new_dim: slice(-minimum_length, 0)})) h.append(hi) gc.collect() # calculate residuum logging.info(f"{plot_name}: calculate residuum") - input_data = input_data - fi + coord_range = range(fi.coords[new_dim].values.min(), fi.coords[new_dim].values.max() + 1) + if new_dim in input_data.coords: + input_data = input_data.sel({new_dim: coord_range}) - fi + else: + input_data = self._shift_data(input_data, coord_range, time_dim, var_dim, new_dim) - fi # create new apriori information for next iteration if no further apriori is provided if len(apriori_list) <= i + 1: logging.info(f"{plot_name}: create diurnal_anomalies") if apriori_diurnal is True and sampling == "1H": - diurnal_anomalies = self.create_hourly_mean(input_data, sel_opts=sel_opts, sampling=sampling, + diurnal_anomalies = self.create_hourly_mean(input_data.sel({new_dim: 0}, drop=True), + sel_opts=sel_opts, sampling=sampling, time_dim=time_dim, as_anomaly=True) else: diurnal_anomalies = 0 @@ -126,17 +141,32 @@ class ClimateFIRFilter: if apriori_type is None or apriori_type == "zeros": # zero version apriori_list.append(xr.zeros_like(apriori_list[i]) + diurnal_anomalies) elif apriori_type == "residuum_stats": # calculate monthly statistic on residuum - apriori_list.append(-self.create_monthly_mean(input_data, sel_opts=sel_opts, sampling=sampling, - time_dim=time_dim) + diurnal_anomalies) + apriori_list.append( + -self.create_monthly_mean(input_data.sel({new_dim: 0}, drop=True), sel_opts=sel_opts, + sampling=sampling, + time_dim=time_dim) + diurnal_anomalies) else: raise ValueError(f"Cannot handle unkown apriori type: {apriori_type}. Please choose from None, " f"`zeros` or `residuum_stats`.") # add last residuum to filtered - filtered.append(input_data) + if minimum_length is None: + filtered.append(input_data) + else: + filtered.append(input_data.sel({new_dim: slice(-minimum_length, 0)})) + # filtered.append(input_data) self._filtered = filtered self._h = h self._apriori = apriori_list + @staticmethod + def _minimum_length(order, minimum_length, pos): + next_order = 0 + if pos + 1 < len(order): + next_order = order[pos + 1] + if minimum_length is not None: + next_order = max(next_order, minimum_length) + return next_order if next_order > 0 else None + @staticmethod def create_unity_array(data, time_dim, extend_range=366): """Create a xr data array filled with ones. time_dim is extended by extend_range days in future and past.""" @@ -393,11 +423,94 @@ class ClimateFIRFilter: res_full.loc[res.coords] = res return res_full, h, apriori + def _tmp_analysis(self, data, apriori, var, var_dim, length, time_dim, new_dim, h): + logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") + d = data.sel({var_dim: [var]}).sel(datetime=slice("2007", "2010")) + a = apriori.sel({var_dim: [var]}).sel(datetime=slice("2007", "2010")) + + # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] + history = self._shift_data(d, range(-length, 1), time_dim, var_dim, new_dim) + + future = self._shift_data(d, range(1, length), time_dim, var_dim, new_dim) + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + filt_nc = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + filt_t0 = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) + future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - diff + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + filt_diff1d = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + diff = (a - history.sel(window=slice(-24 * 7, 1)).mean(new_dim)) + future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) - diff + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + filt_diff1w = xr.apply_ufunc(fir_filter_convolve_vectorized, + filter_input_data, + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + diff = (a - history.sel(window=slice(-24 * 7, 1)).mean(new_dim)) + future = self._shift_data(a, range(1, length), time_dim, var_dim, new_dim) + diff = xr.zeros_like(future) + diff + lam = np.log(2) / (7 * 24) + diff = diff * np.exp(- lam * diff.coords["window"]) + future = future - diff + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") + with TimeTracking(name="convolve"): + filt_diff1w_decay = xr.apply_ufunc(fir_filter_convolve_vectorized, + filter_input_data, + input_core_dims=[[new_dim]], + output_core_dims=[[new_dim]], + vectorize=True, + kwargs={"h": h}) + + t0 = datetime.datetime.strptime("2009-07-15 00:00", "%Y-%m-%d %H:%M") + delta = datetime.timedelta(hours=1) + for i in range(int((length - 1) / 2)): + plt.plot(-i, filt_nc.sel(datetime=t0 - i * delta, window=0), marker="+", color="black") + filt_nc.sel(datetime=t0).plot(label="noncausal") + filt_t0.sel(datetime=t0).plot(label="nodiff") + filt_diff1d.sel(datetime=t0).plot(label="diff1d") + filt_diff1w.sel(datetime=t0).plot(label="diff1w") + filt_diff1w_decay.sel(datetime=t0).plot(label="diff1wdecay") + plt.legend() + + for i in range(int((length - 1) / 2)): + plt.plot(-i, filt_t0.sel(datetime=t0 - i * delta, window=0), marker="+", color="black") + + z = 1 + @TimeTrackingWrapper def clim_filter_vectorized_less_memory(self, data, fs, cutoff_high, order, apriori=None, padlen_factor=0.5, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window="hamming", - plot_index=None): + plot_index=None, minimum_length=None, new_dim="window"): logging.info(f"{data.coords['Stations'].values[0]}: extend apriori") @@ -411,28 +524,39 @@ class ClimateFIRFilter: length = len(h) # create tmp dimension to apply filter, search for unused name - new_dim = self._create_tmp_dimension(data) + # new_dim = self._create_tmp_dimension(data) coll = [] - for var in data.coords[var_dim].values: + for var in reversed(data.coords[var_dim].values): + # self._tmp_analysis(data, apriori, var, var_dim, length, time_dim, new_dim, h) logging.info(f"{data.coords['Stations'].values[0]} ({var}): sel data") d = data.sel({var_dim: [var]}) a = apriori.sel({var_dim: [var]}) # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] logging.info(f"{data.coords['Stations'].values[0]} ({var}): history") - history = self._shift_data(d, range(int(-(length - 1) / 2), 1), time_dim, var_dim, new_dim) - gc.collect() + extend_length = length if minimum_length is None else max(length, minimum_length + int((length + 1) / 2)) + if new_dim not in d.coords: + history = self._shift_data(d, range(int(-extend_length), 1), time_dim, var_dim, new_dim) + gc.collect() + else: + history = d.sel({new_dim: slice(int(-extend_length), 0)}) logging.info(f"{data.coords['Stations'].values[0]} ({var}): future") - future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - gc.collect() + diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) + if new_dim not in a.coords: + future = self._shift_data(a, range(1, int(extend_length + 1)), time_dim, var_dim, new_dim) + # future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - diff + gc.collect() + else: + future = a.sel({new_dim: slice(1, int(extend_length + 1))}) logging.info(f"{data.coords['Stations'].values[0]} ({var}): concat to filter input") + filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") # filter_input_data = history.combine_first(future) # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() - + # ToDo: remove all other filt methods, only keep the convolve one time_axis = filter_input_data.coords[time_dim] # apply vectorized fir filter along the tmp dimension kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, @@ -445,14 +569,15 @@ class ClimateFIRFilter: # kwargs=kwargs) logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") with TimeTracking(name="convolve"): - slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) - filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data.sel({new_dim: slicer}), + # slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) + filt = xr.apply_ufunc(fir_filter_convolve_vectorized, filter_input_data, # .sel({new_dim: slicer}), input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], vectorize=True, kwargs={"h": h}) # plot + # ToDo: enable plotting again if self.plot_path is not None: for i, time_pos in enumerate([0.25, 1.5, 2.75, 4]): # [0.25, 1.5, 2.75, 4] x 365 days try: @@ -462,20 +587,24 @@ class ClimateFIRFilter: t_slice = filter_input_data.isel( {time_dim: slice(pos - int((length - 1) / 2), pos + int((length - 1) / 2) + 1)}).coords[ time_dim].values - self.plot(d, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") + # self.plot(d, filter_example, var_dim, time_dim, t_slice, t0, f"{plot_index}_{i}") except IndexError: pass # select only values at tmp dimension 0 at each point in time - coll.append(filt.sel({new_dim: 0}, drop=True)) + # coll.append(filt.sel({new_dim: 0}, drop=True)) + coll.append(filt.sel({new_dim: slice(-extend_length, 0)}, drop=True)) gc.collect() logging.info(f"{data.coords['Stations'].values[0]}: concat all variables") res = xr.concat(coll, var_dim) # create result array with same shape like input data, gabs are filled by nans logging.info(f"{data.coords['Stations'].values[0]}: create res_full") - res_full = xr.ones_like(data) * np.nan - res_full.loc[res.coords] = res + + new_coords = {**{k: data.coords[k].values for k in data.coords if k != new_dim}, new_dim: res.coords[new_dim]} + dims = [*data.dims, new_dim] if new_dim not in data.dims else data.dims + res_full = xr.DataArray(dims=dims, coords=new_coords) + res_full.loc[res.coords] = res.transpose(*dims) return res_full, h, apriori @staticmethod