diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index ce6d719f9d51bad00b2aea9f75b4f4dcdd722fa7..a95869e07857ea7a05c0fa667308f8e05e97de96 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -33,13 +33,15 @@ class PostProcessing(RunEnvironment): self.test_data: DataGenerator = self.data_store.get("generator", "general.test") self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size) self.train_data: DataGenerator = self.data_store.get("generator", "general.train") + self.train_val_data: DataGenerator = self.data_store.get("generator", "general.train_val") self.plot_path: str = self.data_store.get("plot_path", "general") - self.calculate_skill_scores() - self._run() + self.skill_scores = None + # self._run() def _run(self): self.train_ols_model() preds_for_all_stations = self.make_prediction() + self.skill_scores = self.calculate_skill_scores() #TODO: stopped here, continue with plot routine. Think about if skill score should be saved locally and loaded for plotting or if self.skill_scores should be used self.plot() def _load_model(self): @@ -68,6 +70,10 @@ class PostProcessing(RunEnvironment): plot_folder=self.plot_path) # plot_climsum_boxplot() + # FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=True, **kwargs) + # FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=False, extra_nametag='_all_terms', **kwargs) + # FKf.ss_sum_boxplot(ss_dic, plot_path, modelsetup) + def calculate_test_score(self): test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(), use_multiprocessing=False, verbose=0, steps=1) @@ -197,59 +203,91 @@ class PostProcessing(RunEnvironment): res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() return res + def _get_external_data(self, station): + try: + data = self.train_val_data.get_data_generator(station) + mean, std, transformation_method = data.get_transformation_information(variable='o3') + external_data = self._create_orig_forecast(data, None, mean, std, transformation_method) + external_data = external_data.squeeze("Stations").sel(window=1).drop(["window", "Stations", "variables"]) + return external_data.rename({'datetime': 'index'}) + except KeyError: + return None + def calculate_skill_scores(self, threshold=60): path = self.data_store.get("forecast_path", "general") + window_lead_time = self.data_store.get("window_lead_time", "general") + skill_score_general = {} + skill_score_climatological = {} for station in self.test_data.stations: # TODO: replace this by a more general approach to also calculate on train/val file = os.path.join(path, f"forecasts_{station}_test.nc") data = xr.open_dataarray(file) - ss = SkillScores() - ss.skill_scores(data, station, 3) - - - # get scaling parameters - # mean, std, transformation_method = self.test_data.get_transformation_information(variable='o3') - # tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method) + ss = SkillScores(data) + external_data = self._get_external_data(station) + skill_score_general[station] = ss.skill_scores(window_lead_time) + skill_score_climatological[station] = ss.climatological_skill_scores(external_data, window_lead_time) - # self.test_data.get_data_generator(station).restandardise( - # self.get_data_generator(station).data.sel(variables=self.target_var).squeeze('Stations'), - # variables=self.target_var) + return skill_score_general, skill_score_climatological class SkillScores(RunEnvironment): - def __init__(self): + def __init__(self, internal_data): super().__init__() + self.internal_data = internal_data + + def skill_scores(self, window_lead_time): + ahead_names = list(range(1, window_lead_time + 1)) + skill_score = pd.DataFrame(index=['cnn-persi', 'ols-persi', 'cnn-ols']) + for iahead in ahead_names: + data = self.internal_data.sel(ahead=iahead) + skill_score[iahead] = [self.general_skill_score(data), + self.general_skill_score(data, forecast_name="OLS"), + self.general_skill_score(data, reference_name="OLS")] + return skill_score - def skill_scores(self, data, station_name, window_lead_time): + def climatological_skill_scores(self, external_data, window_lead_time): ahead_names = list(range(1, window_lead_time + 1)) - all_terms_for_clim_deco = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', - 'CASE III', 'CASE IV'] - ss_test_clim = xr.DataArray(np.full((len(all_terms_for_clim_deco), len(ahead_names)), np.nan), - coords=[all_terms_for_clim_deco, ahead_names], dims=['terms', 'ahead']) + all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III', + 'CASE IV'] + skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms], + dims=['terms', 'ahead']) for iahead in ahead_names: - ss_test_clim.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self.skill_score_on_mean_squared_error( - data.sel(ahead=iahead), mu_type=1, forecast_name="CNN").values.flatten()) - - ss_test_clim.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self.skill_score_on_mean_squared_error( - data.sel(ahead=iahead), mu_type=2, forecast_name="CNN").values.flatten()) - - # external_climatology = data.sel(variables="orig") - # - # ss_test_clim.loc[["CASE III", "AIII"], iahead] = np.stack(self.skill_score_on_mean_squared_error( - # data.loc[: iahead, :], mu_type=3, forecast_name="CNN", external_data=external_climatology.mean() - # ).values.flatten()) - # - # ss_test_clim.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self.skill_score_on_mean_squared_error( - # data.loc[: iahead, :], mu_type=4, forecast_name="CNN", external_data=external_climatology.rename({'datetime': 'index', - # 'variables': 'type', - # 'Stations': 'ahead'})).values.flatten()) - - def skill_score_on_mean_squared_error(self, data, mu_type=1, observation_name="orig", forecast_name="CNN", external_data=None): + + data = self.internal_data.sel(ahead=iahead) + + skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=1, forecast_name="CNN").values.flatten()) + + skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=2, forecast_name="CNN").values.flatten()) + + if external_data is not None: + skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=3, forecast_name="CNN", + external_data=external_data).values.flatten()) + + skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=4, forecast_name="CNN", + external_data=external_data).values.flatten()) + + return skill_score + + def _climatological_skill_score(self, data, mu_type=1, observation_name="orig", forecast_name="CNN", external_data=None): kwargs = {"external_data": external_data} if external_data is not None else {} return self.__getattribute__(f"skill_score_mu_case_{mu_type}")(data, observation_name, forecast_name, **kwargs) + @staticmethod + def general_skill_score(data, observation_name="orig", forecast_name="CNN", reference_name="persi"): + data = data.dropna("index") + observation = data.loc[..., observation_name] + forecast = data.loc[..., forecast_name] + reference = data.loc[..., reference_name] + mse = statistics.mean_squared_error + skill_score = 1 - mse(observation, forecast) / mse(observation, reference) + return skill_score + @staticmethod def skill_score_pre_calculations(data, observation_name, forecast_name): @@ -265,15 +303,16 @@ class SkillScores(RunEnvironment): CI = (((mean.loc[..., forecast_name] - mean.loc[..., observation_name]) / var.loc[ ..., observation_name]) ** 2).values - return AI, BI, CI, data + suffix = {"mean": mean, "var": var, "r": r, "p": p} + return AI, BI, CI, data, suffix def skill_score_mu_case_1(self, data, observation_name="orig", forecast_name="CNN"): - AI, BI, CI, data = self.skill_score_pre_calculations(data, observation_name, forecast_name) + AI, BI, CI, data, _ = self.skill_score_pre_calculations(data, observation_name, forecast_name) skill_score = np.array(AI - BI - CI) return pd.DataFrame({"skill_score": [skill_score], "AI": [AI], "BI": [BI], "CI": [CI]}).to_xarray().to_array() def skill_score_mu_case_2(self, data, observation_name="orig", forecast_name="CNN"): - AI, BI, CI, data = self.skill_score_pre_calculations(data, observation_name, forecast_name) + AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) monthly_mean = self.create_monthly_mean_from_daily_data(data) data = xr.concat([data, monthly_mean], dim="type") var = data.var("index") @@ -283,33 +322,38 @@ class SkillScores(RunEnvironment): skill_score = np.array((AI - BI - CI - AII + BII) / (1 - AII + BII)) return pd.DataFrame({"skill_score": [skill_score], "AII": [AII], "BII": [BII]}).to_xarray().to_array() - def skill_score_mu_case_3(self, data, observation_name="orig", forecast_name="CNN"): - AI, BI, CI, data = self.skill_score_pre_calculations(data, observation_name, forecast_name) - pass + def skill_score_mu_case_3(self, data, observation_name="orig", forecast_name="CNN", external_data=None): + AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) + mean, var = suffix["mean"], suffix["var"] + AIII = (((external_data.mean().values - mean.loc[observation_name]) / var.loc[observation_name])**2).values + skill_score = np.array((AI - BI - CI + AIII) / 1 + AIII) + return pd.DataFrame({"skill_score": [skill_score], "AIII": [AIII]}).to_xarray().to_array() + + def skill_score_mu_case_4(self, data, observation_name="orig", forecast_name="CNN", external_data=None): + AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name) + monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values) + data = xr.concat([data, monthly_mean_external], dim="type") + mean = data.mean("index") + var = data.var("index") - def skill_score_mu_case_4(self, data, observation_name="orig", forecast_name="CNN"): - AI, BI, CI, data = self.skill_score_pre_calculations(data, observation_name, forecast_name) - pass + r_mu, p_mu = stats.spearmanr(data.loc[..., [observation_name, observation_name+'X']]) - @staticmethod - def create_monthly_mean_from_daily_data(data, external_data=None, internal_mean=True): + AIV = np.array(r_mu**2) + BIV = ((r_mu - var.loc[observation_name + 'X'] / var.loc[observation_name])**2).values + CIV = (((mean.loc[observation_name + 'X'] - mean.loc[observation_name]) / var.loc[observation_name])**2).values + skill_score = np.array((AI - BI - CI - AIV + BIV + CIV) / (1 - AIV + BIV + CIV)) + return pd.DataFrame({"skill_score": [skill_score], "AIV": [AIV], "BIV": [BIV], "CIV": CIV}).to_xarray().to_array() - coordinates = [data.index, [v + "X" for v in list(data.type.values)]] # TODO - empty_data = np.full((len(data.index), len(data.type)), np.nan) + @staticmethod + def create_monthly_mean_from_daily_data(data, columns=None): + if columns is None: + columns = data.type.values + coordinates = [data.index, [v + "X" for v in list(columns)]] # TODO + empty_data = np.full((len(data.index), len(columns)), np.nan) monthly_mean = xr.DataArray(empty_data, coords=coordinates, dims=["index", "type"]) - if internal_mean: - mu = data.groupby("index.month").mean() - elif not internal_mean and isinstance(external_data, xr.DataArray): - mu = external_data.groupby("index.month").mean().drop("type").drop("ahead") - else: - raise AttributeError(f"Either choose internal_mean=True to calculate the internal mean or use internal_mean" - f"=False and isinstance(external_data, xarray.DataArray) to get the external mean " - f"depending on given external data. Given was internal_mean={internal_mean} and " - f"type(external_data)={type(external_data)} .") + mu = data.groupby("index.month").mean() for month in mu.month: monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values return monthly_mean - -