diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index a1e713a8c135800d02ff7c27894485a5da7fae37..d1259a45edb2cca97595554ebe2f5990595720af 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -256,13 +256,21 @@ class SkillScores: """ models_default = ["cnn", "persi", "ols"] - + ''' def __init__(self, external_data: Union[Data, None], models=None, observation_name="obs", ahead_dim="ahead"): """Set internal data.""" self.external_data = external_data self.models = self.set_model_names(models) self.observation_name = observation_name self.ahead_dim = ahead_dim + ''' + def __init__(self, external_data: Data, models=None, observation_name="obs"): + """Set internal data.""" + self.external_data = external_data + self.models = self.set_model_names(models) + self.observation_name = observation_name + self.ahead_dim = ahead_dim + def set_model_names(self, models: List[str]) -> List[str]: """Either use given models or use defaults.""" @@ -283,7 +291,7 @@ class SkillScores: combinations = list(itertools.combinations(self.models, 2)) combination_strings = [f"{first}-{second}" for (first, second) in combinations] return combinations, combination_strings - + ''' def skill_scores(self) -> pd.DataFrame: """ Calculate skill scores for all combinations of model names. @@ -301,7 +309,28 @@ class SkillScores: observation_name=self.observation_name) for (first, second) in combinations] return skill_score + ''' + def skill_scores(self, window_lead_time: int, ahead_dim="ahead") -> pd.DataFrame: + """ + Calculate skill scores for all combinations of model names. + + :param window_lead_time: length of forecast steps + + :return: skill score for each comparison and forecast step + """ + ahead_names = list(self.external_data[ahead_dim].data) + combinations, combination_strings = self.get_model_name_combinations() + skill_score = pd.DataFrame(index=combination_strings) + for iahead in ahead_names: + data = self.external_data.sel(ahead=iahead) + skill_score[iahead] = [self.general_skill_score(data, + forecast_name=first, + reference_name=second, + observation_name=self.observation_name) + for (first, second) in combinations] + return skill_score + ''' def climatological_skill_scores(self, internal_data: Data, forecast_name: str) -> xr.DataArray: """ Calculate climatological skill scores according to Murphy (1988). @@ -341,6 +370,49 @@ class SkillScores: external_data=external_data).values.flatten()) return skill_score + ''' + def climatological_skill_scores(self, internal_data: Data, window_lead_time: int, + forecast_name: str, ahead_dim="ahead") -> xr.DataArray: + """ + Calculate climatological skill scores according to Murphy (1988). + + Calculate all CASES I - IV and terms [ABC][I-IV]. Internal data has to be set by initialisation, external data + is part of parameters. + + :param internal_data: internal data + :param window_lead_time: interested time step of forecast horizon to select data + :param forecast_name: name of the forecast to use for this calculation (must be available in `data`) + + :return: all CASES as well as all terms + """ + ahead_names = list(self.external_data[ahead_dim].data) + + all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III', + 'CASE IV'] + skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms, ahead_names], + dims=['terms', 'ahead']) + + for iahead in ahead_names: + data = internal_data.sel(ahead=iahead) + + skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=1, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) + + skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=2, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) + + if self.external_data is not None and self.observation_name in self.external_data.coords["type"]: + external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) + skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) + + skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=4, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) + + return skill_score + def _climatological_skill_score(self, internal_data, observation_name, forecast_name, mu_type=1, external_data=None): @@ -369,7 +441,7 @@ class SkillScores: mse = mean_squared_error skill_score = 1 - mse(observation, forecast) / mse(observation, reference) return skill_score.values - + ''' def skill_score_pre_calculations(self, data: Data, observation_name: str, forecast_name: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray, @@ -395,6 +467,41 @@ class SkillScores: sigma = np.sqrt(data.var("index")) r, p = stats.pearsonr(*[data.sel(type=n).values.flatten() for n in [forecast_name, observation_name]]) + AI = np.array(r ** 2) + BI = ((r - (sigma.sel(type=forecast_name, drop=True) / sigma.sel(type=observation_name, + drop=True))) ** 2).values + CI = (((mean.sel(type=forecast_name, drop=True) - mean.sel(type=observation_name, drop=True)) / sigma.sel( + type=observation_name, drop=True)) ** 2).values + + suffix = {"mean": mean, "sigma": sigma, "r": r, "p": p} + return AI, BI, CI, data, suffix + ''' + + @staticmethod + def skill_score_pre_calculations(data: Data, observation_name: str, forecast_name: str) -> Tuple[np.ndarray, + np.ndarray, + np.ndarray, + Data, + Dict[str, Data]]: + """ + Calculate terms AI, BI, and CI, mean, variance and pearson's correlation and clean up data. + + The additional information on mean, variance and pearson's correlation (and the p-value) are returned as + dictionary with the corresponding keys mean, sigma, r and p. + + :param data: internal data to use for calculations + :param observation_name: name of observation + :param forecast_name: name of forecast + + :returns: Terms AI, BI, and CI, internal data without nans and mean, variance, correlation and its p-value + """ + data = data.sel(type=[observation_name, forecast_name]).drop("ahead") + data = data.dropna("index") + + mean = data.mean("index") + sigma = np.sqrt(data.var("index")) + r, p = stats.pearsonr(*[data.sel(type=n).values.flatten() for n in [forecast_name, observation_name]]) + AI = np.array(r ** 2) BI = ((r - (sigma.sel(type=forecast_name, drop=True) / sigma.sel(type=observation_name, drop=True))) ** 2).values diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 4defc4d6439b720d61ce4617ced178531a921f50..c01bee7f1e797deafb42942ec3ee269689ac7e68 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -220,7 +220,7 @@ class PostProcessing(RunEnvironment): file_name = os.path.join(forecast_path, f"bootstraps_{station}_{bootstrap_method}_labels.nc") labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=dims) labels.to_netcdf(file_name) - + ''' def calculate_bootstrap_skill_scores(self, bootstrap_type, bootstrap_method) -> Dict[str, xr.DataArray]: """ Calculate skill score of bootstrapped variables. @@ -275,6 +275,62 @@ class PostProcessing(RunEnvironment): # collect all results in single dictionary score[str(station)] = xr.DataArray(skill, dims=["boot_var", self.ahead_dim]) return score + ''' + def calculate_bootstrap_skill_scores(self, bootstrap_type, bootstrap_method) -> Dict[str, xr.DataArray]: + """ + Calculate skill score of bootstrapped variables. + + Use already created bootstrap predictions and the original predictions (the not-bootstrapped ones) and calculate + skill scores for the bootstraps. The result is saved as a xarray DataArray in a dictionary structure separated + for each station (keys of dictionary). + + :return: The result dictionary with station-wise skill scores + """ + with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})"): + # extract all requirements from data store + forecast_path = self.data_store.get("forecast_path") + number_of_bootstraps = self.data_store.get("number_of_bootstraps", "postprocessing") + forecast_file = f"forecasts_norm_%s_test.nc" + + bootstraps = BootStraps(self.test_data[0], number_of_bootstraps, bootstrap_type=bootstrap_type, + bootstrap_method=bootstrap_method) + number_of_bootstraps = bootstraps.number_of_bootstraps + bootstrap_iter = bootstraps.bootstraps() + skill_scores = statistics.SkillScores(None) + score = {} + for station in self.test_data: + # get station labels + file_name = os.path.join(forecast_path, f"bootstraps_{str(station)}_{bootstrap_method}_labels.nc") + with xr.open_dataarray(file_name) as da: + labels = da.load() + shape = labels.shape + + # get original forecasts + orig = self.get_orig_prediction(forecast_path, forecast_file % str(station), number_of_bootstraps) + orig = orig.reshape(shape) + coords = (range(shape[0]), range(1, shape[1] + 1), ["orig"]) + orig = xr.DataArray(orig, coords=coords, dims=["index", self.ahead_dim, "type"]) + + # calculate skill scores for each variable + skill = pd.DataFrame(columns=range(1, self.window_lead_time + 1)) + for boot_set in bootstrap_iter: + boot_var = f"{boot_set[0]}_{boot_set[1]}" if isinstance(boot_set, tuple) else str(boot_set) + file_name = os.path.join(forecast_path, + f"bootstraps_{station}_{boot_var}_{bootstrap_type}_{bootstrap_method}.nc") + with xr.open_dataarray(file_name) as da: + boot_data = da.load() + boot_data = boot_data.combine_first(labels).combine_first(orig) + boot_scores = [] + for ahead in range(1, self.window_lead_time + 1): + data = boot_data.sel({self.ahead_dim: ahead}) + boot_scores.append( + skill_scores.general_skill_score(data, forecast_name=boot_var, reference_name="orig")) + skill.loc[boot_var] = np.array(boot_scores) + + # collect all results in single dictionary + score[str(station)] = xr.DataArray(skill, dims=["boot_var", self.ahead_dim]) + return score + def get_orig_prediction(self, path, file_name, number_of_bootstraps, prediction_name=None): if prediction_name is None: @@ -749,7 +805,7 @@ class PostProcessing(RunEnvironment): return xr.concat([forecast, competitor], dim=dim) except (TypeError, AttributeError): return forecast if competitor is None else competitor - + ''' def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict]: """ Calculate error metrics and skill scores of NN forecast. @@ -788,6 +844,45 @@ class PostProcessing(RunEnvironment): errors.update({"total": self.calculate_average_errors(errors)}) return skill_score_competitive, skill_score_climatological, errors + ''' + def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict]: + """ + Calculate error metrics and skill scores of NN forecast. + + The competitive skill score compares the NN prediction with persistence and ordinary least squares forecasts. + Whereas, the climatological skill scores evaluates the NN prediction in terms of meaningfulness in comparison + to different climatological references. + + :return: competitive and climatological skill scores, error metrics + """ + path = self.data_store.get("forecast_path") + all_stations = self.data_store.get("stations") + skill_score_competitive = {} + skill_score_climatological = {} + errors = {} + for station in all_stations: + external_data = self._get_external_data(station, path) # test data + + # test errors + if external_data is not None: + errors[station] = statistics.calculate_error_metrics(*map(lambda x: external_data.sel(type=x), + [self.forecast_indicator, "obs"]), + dim="index") + # skill score + competitor = self.load_competitors(station) + combined = self._combine_forecasts(external_data, competitor, dim="type") + model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None + skill_score = statistics.SkillScores(combined, models=model_list) + if external_data is not None: + skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) + + internal_data = self._get_internal_data(station, path) + if internal_data is not None: + skill_score_climatological[station] = skill_score.climatological_skill_scores( + internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) + + errors.update({"total": self.calculate_average_errors(errors)}) + return skill_score_competitive, skill_score_climatological, errors @staticmethod def calculate_average_errors(errors):