diff --git a/src/data_handling/bootstraps.py b/src/data_handling/bootstraps.py index 998ed8c6990d6a16388874c52faf4931ef4ba174..3c6c2c57f36bafd410987e721164ce2d52c98bfc 100644 --- a/src/data_handling/bootstraps.py +++ b/src/data_handling/bootstraps.py @@ -31,6 +31,11 @@ class BootStrapGenerator: """ return len(self.orig_generator)*self.boots*len(self.variables) + def get_labels(self, key): + _, label = self.orig_generator[key] + for _ in range(self.boots): + yield label + def get_generator(self): """ This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return @@ -52,10 +57,16 @@ class BootStrapGenerator: shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots").transpose("datetime", "window", "Stations", "variables") boot_hist = boot_hist.combine_first(shuffled_var) boot_hist = boot_hist.sortby("variables") - self.bootstrap_meta.extend([var]*len_of_label) + self.bootstrap_meta.extend([[var, station]]*len_of_label) yield boot_hist, label return + def get_orig_prediction(self, path, file_name, prediction_name="CNN"): + file = os.path.join(path, file_name) + data = xr.open_dataarray(file) + for _ in range(self.boots): + yield data.sel(type=prediction_name).squeeze() + def load_boot_data(self, station): files = os.listdir(self.bootstrap_path) regex = re.compile(rf"{station}_\w*\.nc") @@ -85,6 +96,27 @@ class BootStraps(RunEnvironment): def get_boot_strap_generator_length(self): return self._boot_strap_generator.__len__() + def get_labels(self, key): + labels_list = [] + chunks = None + for labels in self._boot_strap_generator.get_labels(key): + if len(labels_list) == 0: + chunks = (100, labels.data.shape[1]) + labels_list.append(da.from_array(labels.data, chunks=chunks)) + labels_out = da.concatenate(labels_list, axis=0) + return labels_out.compute() + + def get_orig_prediction(self, path, name): + labels_list = [] + chunks = None + for labels in self._boot_strap_generator.get_orig_prediction(path, name): + if len(labels_list) == 0: + chunks = (100, labels.data.shape[1]) + labels_list.append(da.from_array(labels.data, chunks=chunks)) + labels_out = da.concatenate(labels_list, axis=0) + labels_out = labels_out.compute() + return labels_out[~np.isnan(labels_out).any(axis=1), :] + def get_chunk_size(self): hist, _ = self.data[0] return (100, *hist.shape[1:], self.number_bootstraps) diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index 8a0df43756acfdba0c86be7eecc8e4da37999ce3..0a791617a8c35b57cc3a646abb7e2e9fe5aa968c 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -49,27 +49,60 @@ class PostProcessing(RunEnvironment): self.make_prediction() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip make_prediction() whenever it is possible to save time.") - self.skill_scores = self.calculate_skill_scores() - self.plot() + # self.skill_scores = self.calculate_skill_scores() + # self.plot() self.create_boot_straps() def create_boot_straps(self): + + # forecast + bootstrap_path = self.data_store.get("bootstrap_path", "general") forecast_path = self.data_store.get("forecast_path", "general") window_lead_time = self.data_store.get("window_lead_time", "general") - bootstraps = BootStraps(self.test_data, bootstrap_path, 20) + bootstraps = BootStraps(self.test_data, bootstrap_path, 2) with TimeTracking(name="boot predictions"): bootstrap_predictions = self.model.predict_generator(generator=bootstraps.boot_strap_generator(), steps=bootstraps.get_boot_strap_generator_length()) bootstrap_meta = np.array(bootstraps.get_boot_strap_meta()) - length = sum(bootstrap_meta == bootstrap_meta[0]) - variables = np.unique(bootstrap_meta) - for boot in variables: - ind = (bootstrap_meta == boot) - sel = bootstrap_predictions[ind].reshape((length, window_lead_time, 1)) - tmp = xr.DataArray(sel, coords=(range(length), range(window_lead_time), [boot]), dims=["index", "window", "boot"]) - file_name = os.path.join(forecast_path, f"bootstraps_{boot}.nc") - tmp.to_netcdf(file_name) + variables = np.unique(bootstrap_meta[:, 0]) + for station in np.unique(bootstrap_meta[:, 1]): + coords = None + for boot in variables: + ind = np.all(bootstrap_meta == [boot, station], axis=1) + length = sum(ind) + sel = bootstrap_predictions[ind].reshape((length, window_lead_time, 1)) + coords = (range(length), range(window_lead_time)) + tmp = xr.DataArray(sel, coords=(*coords, [boot]), dims=["index", "window", "type"]) + file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") + tmp.to_netcdf(file_name) + labels = bootstraps.get_labels(station).reshape((length, window_lead_time, 1)) + file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") + labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "window", "type"]) + labels.to_netcdf(file_name) + + # file_name = os.path.join(forecast_path, f"bootstraps_orig.nc") + # orig = xr.open_dataarray(file_name) + + # calc skill scores + skill_scores = statistics.SkillScores(None) + score = {} + for station in np.unique(bootstrap_meta[:, 1]): + file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") + labels = xr.open_dataarray(file_name) + shape = labels.shape + orig = bootstraps.get_orig_prediction(forecast_path, f"forecasts_norm_{station}_test.nc").reshape(shape) + orig = xr.DataArray(orig, coords=(range(shape[0]), range(shape[1]), ["orig"]), dims=["index", "window", "type"]) + score[station] = {} + for boot in variables: + file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") + boot_data = xr.open_dataarray(file_name) + boot_data = boot_data.combine_first(labels) + boot_data = boot_data.combine_first(orig) + score[station][boot] = skill_scores.general_skill_score(boot_data, forecast_name=boot, reference_name="orig") + + # plot + def _load_model(self): try: @@ -116,64 +149,72 @@ class PostProcessing(RunEnvironment): logging.debug("start make_prediction") for i, _ in enumerate(self.test_data): data = self.test_data.get_data_generator(i) - - nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3) input_data = data.get_transposed_history() # get scaling parameters mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) - # nn forecast - nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method) + for normalised in [True, False]: + # create empty arrays + nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3) + + # nn forecast + nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method, normalised) - # persistence - persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std, - transformation_method) + # persistence + persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std, + transformation_method, normalised) - # ols - ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method) + # ols + ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method, normalised) - # observation - observation = self._create_observation(data, None, mean, std, transformation_method) + # observation + observation = self._create_observation(data, None, mean, std, transformation_method, normalised) - # merge all predictions - full_index = self.create_fullindex(data.data.indexes['datetime'], self._get_frequency()) - all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']), - CNN=nn_prediction, - persi=persistence_prediction, - obs=observation, - OLS=ols_prediction) + # merge all predictions + full_index = self.create_fullindex(data.data.indexes['datetime'], self._get_frequency()) + all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']), + CNN=nn_prediction, + persi=persistence_prediction, + obs=observation, + OLS=ols_prediction) - # save all forecasts locally - path = self.data_store.get("forecast_path", "general") - file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc") - all_predictions.to_netcdf(file) + # save all forecasts locally + path = self.data_store.get("forecast_path", "general") + prefix = "forecasts_norm" if normalised else "forecasts" + file = os.path.join(path, f"{prefix}_{data.station[0]}_test.nc") + all_predictions.to_netcdf(file) def _get_frequency(self): getter = {"daily": "1D", "hourly": "1H"} return getter.get(self._sampling, None) @staticmethod - def _create_observation(data, _, mean, std, transformation_method): - return statistics.apply_inverse_transformation(data.label.copy(), mean, std, transformation_method) + def _create_observation(data, _, mean, std, transformation_method, normalised): + obs = data.label.copy() + if not normalised: + obs = statistics.apply_inverse_transformation(obs, mean, std, transformation_method) + return obs - def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method): + def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method, normalised): tmp_ols = self.ols_model.predict(input_data) - tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) + if not normalised: + tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) tmp_ols = np.expand_dims(tmp_ols, axis=1) target_shape = ols_prediction.values.shape ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols return ols_prediction - def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method): + def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method, normalised): tmp_persi = input_data.sel({'window': 0, 'variables': self.target_var}) - tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) + if not normalised: + tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) window_lead_time = self.data_store.get("window_lead_time", "general") persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)), axis=1) return persistence_prediction - def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method): + def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method, normalised): """ create the nn forecast for given input data. Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the output of the main branch is returned (not all minor branches, if @@ -186,7 +227,8 @@ class PostProcessing(RunEnvironment): :return: """ tmp_nn = self.model.predict(input_data) - tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method) + if not normalised: + tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method) if tmp_nn.ndim == 3: nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn[-1, ...], axis=1), 2, 0) elif tmp_nn.ndim == 2: