diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 576fe9d7cab132abc2f3a3a9bd8791280ebbe618..8824acc214400aa2f877f01ee1ae27bb7382bc84 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -171,8 +171,8 @@ class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): @TimeTrackingWrapper def apply_filter(self): """Apply FIR filter only on inputs.""" - fir = FIRFilter(self.input_data, self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, - self.target_dim) + fir = FIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, self.filter_cutoff_freq, + self.filter_window_type, self.target_dim) self.fir_coeff = fir.filter_coefficients() fir_data = fir.filtered_data() if self._add_unfiltered is True: @@ -319,7 +319,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation """Apply FIR filter only on inputs.""" self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori logging.info(f"{self.station}: call ClimateFIRFilter") - climate_filter = ClimateFIRFilter(self.input_data, self.fs, self.filter_order, self.filter_cutoff_freq, + climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, + self.filter_cutoff_freq, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, apriori_type=self.apriori_type, apriori=self.apriori, apriori_diurnal=self.apriori_diurnal, sel_opts=self.apriori_sel_opts, diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index c289df3b6e3ef89688267a2c3e0d067a64a113d2..7b9a175245a911e7d69bb90544ae025022488eeb 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -164,7 +164,7 @@ class ClimateFIRFilter: if pos + 1 < len(order): next_order = order[pos + 1] if minimum_length is not None: - next_order = max(next_order, minimum_length) + next_order = next_order + minimum_length return next_order if next_order > 0 else None @staticmethod @@ -517,6 +517,7 @@ class ClimateFIRFilter: # calculate apriori information from data if not given and extend its range if not sufficient long enough if apriori is None: apriori = self.create_monthly_mean(data, sel_opts=sel_opts, sampling=sampling, time_dim=time_dim) + apriori = apriori.astype(data.dtype) apriori = self.extend_apriori(data, apriori, time_dim, sampling) # calculate FIR filter coefficients @@ -526,6 +527,10 @@ class ClimateFIRFilter: # create tmp dimension to apply filter, search for unused name # new_dim = self._create_tmp_dimension(data) + # use filter length if no minimum is given, otherwise use minimum + half filter length for extension + extend_length_history = length if minimum_length is None else minimum_length + int((length + 1) / 2) + extend_length_future = int((length + 1) / 2) + 1 + coll = [] for var in reversed(data.coords[var_dim].values): @@ -537,10 +542,8 @@ class ClimateFIRFilter: filt_coll = [] for _year in range(_start, _end + 1): logging.info(f"{data.coords['Stations'].values[0]} ({var}): year={_year}") - extend_length = length if minimum_length is None else max(length, - minimum_length + int((length + 1) / 2)) - time_slice = self._create_time_range_extend(_year, sampling, extend_length) + time_slice = self._create_time_range_extend(_year, sampling, extend_length_history) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) if len(d.coords[time_dim]) == 0: # no data at all for this year @@ -549,18 +552,16 @@ class ClimateFIRFilter: # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] logging.info(f"{data.coords['Stations'].values[0]} ({var}): history") if new_dim not in d.coords: - history = self._shift_data(d, range(int(-extend_length), 1), time_dim, var_dim, new_dim) - gc.collect() + history = self._shift_data(d, range(int(-extend_length_history), 1), time_dim, var_dim, new_dim) else: - history = d.sel({new_dim: slice(int(-extend_length), 0)}) + history = d.sel({new_dim: slice(int(-extend_length_history), 0)}) logging.info(f"{data.coords['Stations'].values[0]} ({var}): future") diff = (a - history.sel(window=slice(-24, 1)).mean(new_dim)) if new_dim not in a.coords: - future = self._shift_data(a, range(1, int(extend_length + 1)), time_dim, var_dim, new_dim) + future = self._shift_data(a, range(1, extend_length_future), time_dim, var_dim, new_dim) # future = self._shift_data(a, range(1, int((length - 1) / 2) + 1), time_dim, var_dim, new_dim) - diff - gc.collect() else: - future = a.sel({new_dim: slice(1, int(extend_length + 1))}) + future = a.sel({new_dim: slice(1, extend_length_future)}) logging.info(f"{data.coords['Stations'].values[0]} ({var}): concat to filter input") filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") @@ -574,16 +575,11 @@ class ClimateFIRFilter: # history.sel(datetime=slice("2010-11-01", "2011-04-01"),variables="o3").plot() # filter_input_data.sel(datetime=slice("2009-11-01", "2011-04-01"),variables="temp").plot() # ToDo: remove all other filt methods, only keep the convolve one - time_axis = filter_input_data.coords[time_dim] - # apply vectorized fir filter along the tmp dimension - kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, - "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} - # with TimeTracking(name="numpy_vec"): - # filt = fir_filter_numpy_vectorized(filter_input_data, var_dim, new_dim, kwargs) - # with TimeTracking(name="xr_apply_ufunc"): - # filt = xr.apply_ufunc(fir_filter_vectorized, filter_input_data, time_axis, - # input_core_dims=[[new_dim], []], output_core_dims=[[new_dim]], vectorize=True, - # kwargs=kwargs) + # time_axis = filter_input_data.coords[time_dim] + # # apply vectorized fir filter along the tmp dimension + # kwargs = {"fs": fs, "cutoff_high": cutoff_high, "order": order, + # "causal": False, "padlen": int(min(padlen_factor, 1) * length), "h": h} + logging.info(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") with TimeTracking(name="convolve"): # slicer = slice(int(-(length - 1) / 2), int((length - 1) / 2)) @@ -591,9 +587,13 @@ class ClimateFIRFilter: input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], vectorize=True, - kwargs={"h": h}) + kwargs={"h": h}, + output_dtypes=[d.dtype]) - filt_coll.append(filt.sel({new_dim: slice(-extend_length, 0)}, drop=True)) + if minimum_length is None: + filt_coll.append(filt.sel({new_dim: slice(-extend_length_history, 0)}, drop=True)) + else: + filt_coll.append(filt.sel({new_dim: slice(-minimum_length, 0)}, drop=True)) # plot # ToDo: enable plotting again @@ -623,8 +623,11 @@ class ClimateFIRFilter: new_coords = {**{k: data.coords[k].values for k in data.coords if k != new_dim}, new_dim: res.coords[new_dim]} dims = [*data.dims, new_dim] if new_dim not in data.dims else data.dims - res_full = xr.DataArray(dims=dims, coords=new_coords) - res_full.loc[res.coords] = res.transpose(*dims) + res = res.transpose(*dims) + # res_full = xr.DataArray(dims=dims, coords=new_coords) + # res_full.loc[res.coords] = res + # res_full.compute() + res_full = res.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) return res_full, h, apriori @staticmethod diff --git a/mlair/helpers/helpers.py b/mlair/helpers/helpers.py index b57b733b08c4635a16d7fd18e99538a991521fd8..7eab911122cb4dc030887bed476a4e253553332e 100644 --- a/mlair/helpers/helpers.py +++ b/mlair/helpers/helpers.py @@ -179,3 +179,11 @@ def convert2xrda(arr: Union[xr.DataArray, xr.Dataset, np.ndarray, int, float], return xr.DataArray(arr, **kwargs) +def convert_size(size_bytes): + if size_bytes == 0: + return "0B" + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + i = int(math.floor(math.log(size_bytes, 1024))) + p = math.pow(1024, i) + s = round(size_bytes / p, 2) + return "%s %s" % (s, size_name[i])