diff --git a/mlair/helpers/helpers.py b/mlair/helpers/helpers.py index 4cc7310db32c2ef3bbdb9f70896a2f8455a974fc..1f5a86cde01752b74be82476e2e0fd8cad514a9e 100644 --- a/mlair/helpers/helpers.py +++ b/mlair/helpers/helpers.py @@ -81,8 +81,10 @@ def remove_items(obj: Union[List, Dict, Tuple], items: Any): def remove_from_list(list_obj, item_list): """Remove implementation for lists.""" - if len(items) > 1: + if len(item_list) > 1: return [e for e in list_obj if e not in item_list] + elif len(item_list) == 0: + return list_obj else: list_obj = list_obj.copy() try: diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 87f780f9a6133edfcb2f9c71c2956b92f332e915..8b510f704396b35cf022089e7d94e037f4c62a2b 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -287,7 +287,7 @@ class SkillScores: combination_strings = [f"{first}-{second}" for (first, second) in combinations] return combinations, combination_strings - def skill_scores(self) -> pd.DataFrame: + def skill_scores(self) -> [pd.DataFrame, pd.DataFrame]: """ Calculate skill scores for all combinations of model names. @@ -296,6 +296,7 @@ class SkillScores: ahead_names = list(self.external_data[self.ahead_dim].data) combinations, combination_strings = self.get_model_name_combinations() skill_score = pd.DataFrame(index=combination_strings) + count = pd.DataFrame(index=combination_strings) for iahead in ahead_names: data = self.external_data.sel({self.ahead_dim: iahead}) skill_score[iahead] = [self.general_skill_score(data, @@ -303,7 +304,12 @@ class SkillScores: reference_name=second, observation_name=self.observation_name) for (first, second) in combinations] - return skill_score + count[iahead] = [self.get_count(data, + forecast_name=first, + reference_name=second, + observation_name=self.observation_name) + for (first, second) in combinations] + return skill_score, count def climatological_skill_scores(self, internal_data: Data, forecast_name: str) -> xr.DataArray: """ @@ -376,6 +382,21 @@ class SkillScores: skill_score = 1 - mse(observation, forecast) / mse(observation, reference) return skill_score.values + def get_count(self, data: Data, forecast_name: str, reference_name: str, + observation_name: str = None) -> np.ndarray: + r""" + Calculate general skill score based on mean squared error. + + :param data: internal data containing data for observation, forecast and reference + :param observation_name: name of observation + :param forecast_name: name of forecast + :param reference_name: name of reference + + :return: skill score of forecast + """ + data = data.dropna("index") + return data.count("index").max().values + def skill_score_pre_calculations(self, data: Data, observation_name: str, forecast_name: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray, diff --git a/mlair/plotting/postprocessing_plotting.py b/mlair/plotting/postprocessing_plotting.py index 7ba84656cc00a89a7ec88efcc30bf665e0849e2f..5e8b121ae26ed0d4a967b9c440071d61c1ad703b 100644 --- a/mlair/plotting/postprocessing_plotting.py +++ b/mlair/plotting/postprocessing_plotting.py @@ -491,12 +491,12 @@ class PlotCompetitiveSkillScore(AbstractPlotClass): """ - def __init__(self, data: pd.DataFrame, plot_folder=".", model_setup="NN"): + def __init__(self, data: Dict[str, pd.DataFrame], plot_folder=".", model_setup="NN"): """Initialise.""" super().__init__(plot_folder, f"skill_score_competitive_{model_setup}") self._model_setup = model_setup self._labels = None - self._data = self._prepare_data(data) + self._data = self._prepare_data(helpers.remove_items(data, "total")) default_plot_name = self.plot_name # draw full detail plot self.plot_name = default_plot_name + "_full_detail" diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 56b5c363f15aa7f40a25bd02392dd9d85bf88396..72362c8770dce5febb52943d4dc871b3c11b3389 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -87,8 +87,10 @@ class PostProcessing(RunEnvironment): self.competitor_path = self.data_store.get("competitor_path") self.competitors = to_list(self.data_store.get_default("competitors", default=[])) self.forecast_indicator = "nn" + self.observation_indicator = "obs" self.ahead_dim = "ahead" self.boot_var_dim = "boot_var" + self.model_type_dim = "type" self._run() def _run(self): @@ -115,10 +117,11 @@ class PostProcessing(RunEnvironment): # skill scores and error metrics with TimeTracking(name="calculate skill scores"): - skill_score_competitive, skill_score_climatological, errors = self.calculate_error_metrics() + skill_score_competitive, _, skill_score_climatological, errors = self.calculate_error_metrics() self.skill_scores = (skill_score_competitive, skill_score_climatological) self.report_error_metrics(errors) - self.report_error_metrics(skill_score_climatological) + self.report_error_metrics({self.forecast_indicator: skill_score_climatological}) + self.report_error_metrics({"skill_score": skill_score_competitive}) # plotting self.plot() @@ -142,7 +145,7 @@ class PostProcessing(RunEnvironment): except (FileNotFoundError, KeyError): logging.debug(f"No competitor found for combination '{station_name}' and '{competitor_name}'.") continue - return xr.concat(competing_predictions, "type") if len(competing_predictions) > 0 else None + return xr.concat(competing_predictions, self.model_type_dim) if len(competing_predictions) > 0 else None def bootstrap_postprocessing(self, create_new_bootstraps: bool, _iter: int = 0, bootstrap_type="singleinput", bootstrap_method="shuffle") -> None: @@ -190,7 +193,7 @@ class PostProcessing(RunEnvironment): # extract all requirements from data store forecast_path = self.data_store.get("forecast_path") number_of_bootstraps = self.data_store.get("number_of_bootstraps", "postprocessing") - dims = ["index", self.ahead_dim, "type"] + dims = ["index", self.ahead_dim, self.model_type_dim] for station in self.test_data: X, Y = None, None bootstraps = BootStraps(station, number_of_bootstraps, bootstrap_type=bootstrap_type, @@ -214,7 +217,7 @@ class PostProcessing(RunEnvironment): # store also true labels for each station labels = np.expand_dims(Y, axis=-1) file_name = os.path.join(forecast_path, f"bootstraps_{station}_{bootstrap_method}_labels.nc") - labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=dims) + labels = xr.DataArray(labels, coords=(*coords, [self.observation_indicator]), dims=dims) labels.to_netcdf(file_name) def calculate_bootstrap_skill_scores(self, bootstrap_type, bootstrap_method) -> Dict[str, xr.DataArray]: @@ -250,7 +253,7 @@ class PostProcessing(RunEnvironment): orig = self.get_orig_prediction(forecast_path, forecast_file % str(station), number_of_bootstraps) orig = orig.reshape(shape) coords = (range(shape[0]), range(1, shape[1] + 1), ["orig"]) - orig = xr.DataArray(orig, coords=coords, dims=["index", self.ahead_dim, "type"]) + orig = xr.DataArray(orig, coords=coords, dims=["index", self.ahead_dim, self.model_type_dim]) # calculate skill scores for each variable skill = pd.DataFrame(columns=range(1, self.window_lead_time + 1)) @@ -516,7 +519,7 @@ class PostProcessing(RunEnvironment): full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency()) prediction_dict = {self.forecast_indicator: nn_prediction, "persi": persistence_prediction, - "obs": observation, + self.observation_indicator: observation, "ols": ols_prediction} all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]), time_dimension, ahead_dim=self.ahead_dim, @@ -549,7 +552,7 @@ class PostProcessing(RunEnvironment): with xr.open_dataarray(file) as da: data = da.load() forecast = data.sel(type=[self.forecast_indicator]) - forecast.coords["type"] = [competitor_name] + forecast.coords[self.model_type_dim] = [competitor_name] return forecast def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray: @@ -731,18 +734,19 @@ class PostProcessing(RunEnvironment): except (IndexError, KeyError, FileNotFoundError): return None - @staticmethod - def _combine_forecasts(forecast, competitor, dim="type"): + def _combine_forecasts(self, forecast, competitor, dim=None): """ Combine forecast and competitor if both are xarray. If competitor is None, this returns forecasts and vise versa. """ + if dim is None: + dim = self.model_type_dim try: return xr.concat([forecast, competitor], dim=dim) except (TypeError, AttributeError): return forecast if competitor is None else competitor - def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict]: + def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict, Dict]: """ Calculate error metrics and skill scores of NN forecast. @@ -755,6 +759,7 @@ class PostProcessing(RunEnvironment): path = self.data_store.get("forecast_path") all_stations = self.data_store.get("stations") skill_score_competitive = {} + skill_score_competitive_count = {} skill_score_climatological = {} errors = {} for station in all_stations: @@ -762,24 +767,61 @@ class PostProcessing(RunEnvironment): # test errors if external_data is not None: - errors[station] = statistics.calculate_error_metrics(*map(lambda x: external_data.sel(type=x), - [self.forecast_indicator, "obs"]), - dim="index") - # skill score + model_type_list = external_data.coords[self.model_type_dim].values.tolist() + for model_type in remove_items(model_type_list, self.observation_indicator): + if model_type not in errors.keys(): + errors[model_type] = {} + errors[model_type][station] = statistics.calculate_error_metrics( + *map(lambda x: external_data.sel(**{self.model_type_dim: x}), + [model_type, self.observation_indicator]), dim="index") + + # load competitors competitor = self.load_competitors(station) - combined = self._combine_forecasts(external_data, competitor, dim="type") - model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None + combined = self._combine_forecasts(external_data, competitor, dim=self.model_type_dim) + if combined is not None: + model_list = remove_items(combined.coords[self.model_type_dim].values.tolist(), + self.observation_indicator) + else: + model_list = None + + # test errors of competitors + for model_type in remove_items(model_list or [], list(errors.keys())): + if self.observation_indicator not in combined.coords[self.model_type_dim]: + continue + if model_type not in errors.keys(): + errors[model_type] = {} + errors[model_type][station] = statistics.calculate_error_metrics( + *map(lambda x: combined.sel(**{self.model_type_dim: x}), + [model_type, self.observation_indicator]), dim="index") + + # skill score skill_score = statistics.SkillScores(combined, models=model_list, ahead_dim=self.ahead_dim) if external_data is not None: - skill_score_competitive[station] = skill_score.skill_scores() + skill_score_competitive[station], skill_score_competitive_count[station] = skill_score.skill_scores() internal_data = self._get_internal_data(station, path) if internal_data is not None: skill_score_climatological[station] = skill_score.climatological_skill_scores( internal_data, forecast_name=self.forecast_indicator) - errors.update({"total": self.calculate_average_errors(errors)}) - return skill_score_competitive, skill_score_climatological, errors + for model_type in errors.keys(): + errors[model_type].update({"total": self.calculate_average_errors(errors[model_type])}) + skill_score_competitive.update({"total": self.calculate_average_skill_scores(skill_score_competitive, + skill_score_competitive_count)}) + return skill_score_competitive, skill_score_competitive_count, skill_score_climatological, errors + + @staticmethod + def calculate_average_skill_scores(scores, counts): + avg_skill_score = 0 + n_total = None + for vals in counts.values(): + n_total = vals if n_total is None else n_total.add(vals, fill_value=0) + for station, station_scores in scores.items(): + n = counts.get(station) + avg_skill_score = station_scores.mul(n.div(n_total, fill_value=0), fill_value=0).add(avg_skill_score, + fill_value=0) + return avg_skill_score + @staticmethod def calculate_average_errors(errors): @@ -796,7 +838,7 @@ class PostProcessing(RunEnvironment): """Create a csv file containing all results from bootstrapping.""" report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(report_path) - res = [["type", "method", "station", self.boot_var_dim, self.ahead_dim, "vals"]] + res = [[self.model_type_dim, "method", "station", self.boot_var_dim, self.ahead_dim, "vals"]] for boot_type, d0 in results.items(): for boot_method, d1 in d0.items(): for station_name, vals in d1.items(): @@ -812,25 +854,35 @@ class PostProcessing(RunEnvironment): def report_error_metrics(self, errors): report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(report_path) - metric_collection = {} - for station, station_errors in errors.items(): - if isinstance(station_errors, xr.DataArray): - dim = station_errors.dims[0] - sel_index = [sel for sel in station_errors.coords[dim] if "CASE" in str(sel)] - station_errors = {str(i.values): station_errors.sel(**{dim: i}) for i in sel_index} - for metric, vals in station_errors.items(): - if metric == "n": - continue - pd_vals = pd.DataFrame.from_dict({station: vals}).T - pd_vals.columns = [f"{metric}(t+{x})" for x in vals.coords["ahead"].values] - mc = metric_collection.get(metric, pd.DataFrame()) - mc = mc.append(pd_vals) - metric_collection[metric] = mc - for metric, error_df in metric_collection.items(): - df = error_df.sort_index() - if "total" in df.index: - df.reindex(df.index.drop(["total"]).to_list() + ["total"], ) - column_format = tables.create_column_format_for_tex(df) - file_name = f"error_report_{metric}.%s".replace(' ', '_') - tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df) - tables.save_to_md(report_path, file_name % "md", df=df) + for model_type in errors.keys(): + metric_collection = {} + for station, station_errors in errors[model_type].items(): + if isinstance(station_errors, xr.DataArray): + dim = station_errors.dims[0] + sel_index = [sel for sel in station_errors.coords[dim] if "CASE" in str(sel)] + station_errors = {str(i.values): station_errors.sel(**{dim: i}) for i in sel_index} + elif isinstance(station_errors, pd.DataFrame): + sel_index = station_errors.index.tolist() + ahead = station_errors.columns.values + station_errors = {k: xr.DataArray(station_errors[station_errors.index == k].values.flatten(), + dims=["ahead"], coords={"ahead": ahead}).astype(float) + for k in sel_index} + for metric, vals in station_errors.items(): + if metric == "n": + metric = "count" + pd_vals = pd.DataFrame.from_dict({station: vals}).T + pd_vals.columns = [f"{metric}(t+{x})" for x in vals.coords["ahead"].values] + mc = metric_collection.get(metric, pd.DataFrame()) + mc = mc.append(pd_vals) + metric_collection[metric] = mc + for metric, error_df in metric_collection.items(): + df = error_df.sort_index() + if "total" in df.index: + df.reindex(df.index.drop(["total"]).to_list() + ["total"], ) + column_format = tables.create_column_format_for_tex(df) + if model_type == "skill_score": + file_name = f"error_report_{model_type}_{metric}.%s".replace(' ', '_') + else: + file_name = f"error_report_{metric}_{model_type}.%s".replace(' ', '_') + tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df) + tables.save_to_md(report_path, file_name % "md", df=df)