diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index a0cc4e4051bd2375098b91b682fef2dbbad98720..3d4238b8b25ee432feea0f8c0dcc009d60129ad5 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -2,6 +2,7 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-11' +import inspect import logging import os @@ -23,6 +24,8 @@ from src.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStation from src.plotting.postprocessing_plotting import plot_conditional_quantiles from src.run_modules.run_environment import RunEnvironment +from typing import Dict + class PostProcessing(RunEnvironment): @@ -54,10 +57,7 @@ class PostProcessing(RunEnvironment): # bootstraps if self.data_store.get("evaluate_bootstraps", "general.postprocessing"): - # bootstrap_path = self.data_store.get("bootstrap_path", "general.postprocessing") - # number_of_bootstraps = self.data_store.get("number_of_bootstraps", "general.postprocessing") - # BootStraps(self.test_data, bootstrap_path, number_of_bootstraps) - with TimeTracking(name="split (refac_1): create_boot_straps_refac_2()"): + with TimeTracking(name="calculate bootstraps"): create_new_bootstraps = self.data_store.get("create_new_bootstraps", "general.postprocessing") self.bootstrap_postprocessing(create_new_bootstraps) @@ -67,28 +67,42 @@ class PostProcessing(RunEnvironment): # plotting self.plot() - def bootstrap_postprocessing(self, create_new_bootstraps, _iter=0): + def bootstrap_postprocessing(self, create_new_bootstraps: bool, _iter: int = 0) -> None: + """ + Create skill scores of bootstrapped data. Also creates these bootstraps if create_new_bootstraps is true or a + failure occurred during skill score calculation. Sets class attribute bootstrap_skill_scores. + :param create_new_bootstraps: calculate all bootstrap predictions and overwrite already available predictions + :param _iter: internal counter to reduce unnecessary recursive calls (maximum number is 2, otherwise something + went wrong). + """ try: if create_new_bootstraps: - self.create_boot_straps() + self.create_bootstrap_forecast() self.bootstrap_skill_scores = self.calculate_bootstrap_skill_scores() except FileNotFoundError: if _iter != 0: raise RuntimeError("bootstrap_postprocessing is called for the 2nd time. This means, that calling" - "create_boot_straps() couldn't solve the FileNotFoundError. Therefore, please check" "manually the reason for the failure.") logging.info("Couldn't load all files, restart bootstrap postprocessing with create_new_bootstraps=True.") self.bootstrap_postprocessing(True, _iter=1) - def create_boot_straps(self): + def create_bootstrap_forecast(self) -> None: + """ + Creates the bootstrapped predictions for all stations and variables. These forecasts are saved in bootstrap_path + with the names `bootstraps_{var}_{station}.nc` and `bootstraps_labels_{station}.nc`. + """ # forecast - with TimeTracking(name="boot predictions"): + with TimeTracking(name=inspect.stack()[0].function): + # extract all requirements from data store bootstrap_path = self.data_store.get("bootstrap_path", "general") forecast_path = self.data_store.get("forecast_path", "general") window_lead_time = self.data_store.get("window_lead_time", "general") number_of_bootstraps = self.data_store.get("number_of_bootstraps", "general.postprocessing") + + # set bootstrap class bootstraps = BootStraps(self.test_data, bootstrap_path, number_of_bootstraps) + # create bootstrapped predictions for all stations and variables and save it to disk for station in bootstraps.stations: with TimeTracking(name=station): logging.info(station) @@ -99,7 +113,7 @@ class PostProcessing(RunEnvironment): bootstrap_predictions = self.model.predict_generator(generator=station_bootstrap(), steps=length, use_multiprocessing=True) - if isinstance(bootstrap_predictions, list): + if isinstance(bootstrap_predictions, list): # if model is branched model bootstrap_predictions = bootstrap_predictions[-1] # get bootstrap prediction meta data bootstrap_meta = np.array(bootstraps.get_bootstrap_meta_station_var_wise(station, var)) @@ -118,27 +132,38 @@ class PostProcessing(RunEnvironment): labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "ahead", "type"]) labels.to_netcdf(file_name) - def calculate_bootstrap_skill_scores(self): - - with TimeTracking(name="boot skill scores"): + def calculate_bootstrap_skill_scores(self) -> Dict[xr.DataArray]: + """ + Use already created bootstrap predictions and the original predictions (the not-bootstrapped ones) and calculate + skill scores for the bootstraps. The result is saved as a xarray DataArray in a dictionary structure separated + for each station (keys of dictionary). + :return: The result dictionary with station-wise skill scores + """ + with TimeTracking(name=inspect.stack()[0].function): + # extract all requirements from data store bootstrap_path = self.data_store.get("bootstrap_path", "general") forecast_path = self.data_store.get("forecast_path", "general") window_lead_time = self.data_store.get("window_lead_time", "general") number_of_bootstraps = self.data_store.get("number_of_bootstraps", "general.postprocessing") bootstraps = BootStraps(self.test_data, bootstrap_path, number_of_bootstraps) - # calc skill scores skill_scores = statistics.SkillScores(None) score = {} for station in self.test_data.stations: logging.info(station) + + # get station labels file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") labels = xr.open_dataarray(file_name) shape = labels.shape + + # get original forecasts orig = bootstraps.get_orig_prediction(forecast_path, f"forecasts_norm_{station}_test.nc").reshape(shape) coords = (range(shape[0]), range(1, shape[1] + 1), ["orig"]) orig = xr.DataArray(orig, coords=coords, dims=["index", "ahead", "type"]) + + # calculate skill scores for each variable skill = pd.DataFrame(columns=range(1, window_lead_time + 1)) for boot in self.test_data.variables: file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") @@ -149,6 +174,8 @@ class PostProcessing(RunEnvironment): data = boot_data.sel(ahead=ahead) boot_scores.append(skill_scores.general_skill_score(data, forecast_name=boot, reference_name="orig")) skill.loc[boot] = np.array(boot_scores) + + # collect all results in single dictionary score[station] = xr.DataArray(skill, dims=["boot_var", "ahead"]) return score