From b38104d1161c173a9d84fee7b50ed0d8e1359b08 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Mon, 22 Feb 2021 14:17:46 +0100 Subject: [PATCH] changed logging messages, skill scores are adjusted to work with None data --- mlair/helpers/statistics.py | 16 ++++++------ mlair/run_modules/post_processing.py | 39 +++++++++++++++++++--------- mlair/run_modules/pre_processing.py | 6 ++--- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index ea5a9f05..ad6a368f 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -311,7 +311,6 @@ class SkillScores: for iahead in ahead_names: data = internal_data.sel(ahead=iahead) - external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=1, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) @@ -319,14 +318,15 @@ class SkillScores: skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=2, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) - # if external_data is not None: - skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( - data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, - external_data=external_data).values.flatten()) + if self.external_data is not None: + external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) + skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) - skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( - data, mu_type=4, forecast_name=forecast_name, observation_name=self.observation_name, - external_data=external_data).values.flatten()) + skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=4, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) return skill_score diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index c810d3c5..127066b8 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -407,7 +407,7 @@ class PostProcessing(RunEnvironment): be found inside `forecast_path`. """ subset_type = subset.name - logging.debug(f"start make_prediction for {subset_type}") + logging.info(f"start make_prediction for {subset_type}") time_dimension = self.data_store.get("time_dim") window_dim = self.data_store.get("window_dim") subset_type = subset.name @@ -627,7 +627,7 @@ class PostProcessing(RunEnvironment): try: file = os.path.join(path, f"forecasts_{str(station)}_train_val.nc") return xr.open_dataarray(file) - except (IndexError, KeyError): + except (IndexError, KeyError, FileNotFoundError): return None def _get_external_data(self, station: str, path: str) -> Union[xr.DataArray, None]: @@ -642,9 +642,20 @@ class PostProcessing(RunEnvironment): try: file = os.path.join(path, f"forecasts_{str(station)}_test.nc") return xr.open_dataarray(file) - except (IndexError, KeyError): + except (IndexError, KeyError, FileNotFoundError): return None + @staticmethod + def _combine_forecasts(forecast, competitor, dim="type"): + """ + Combine forecast and competitor if both are xarray. If competitor is None, this returns forecasts and vise + versa. + """ + try: + return xr.concat([forecast, competitor], dim=dim) + except (TypeError, AttributeError): + return forecast if competitor is None else competitor + def calculate_skill_scores(self) -> Tuple[Dict, Dict]: """ Calculate skill scores of NN forecast. @@ -656,16 +667,20 @@ class PostProcessing(RunEnvironment): :return: competitive and climatological skill scores """ path = self.data_store.get("forecast_path") + all_stations = self.data_store.get("stations") skill_score_competitive = {} skill_score_climatological = {} - for station in self.test_data: + for station in all_stations: external_data = self._get_external_data(station, path) - competitor = self.load_competitors(str(station)) - combined = xr.concat([external_data, competitor], dim="type") if competitor is not None else external_data - skill_score = statistics.SkillScores(combined, models=remove_items(list(combined.type.values), "obs")) - skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) - - internal_data = self._get_internal_data(station, path) # ToDo: check if external is still right? - skill_score_climatological[station] = skill_score.climatological_skill_scores( - internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) + competitor = self.load_competitors(station) + combined = self._combine_forecasts(external_data, competitor, dim="type") + model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None + skill_score = statistics.SkillScores(combined, models=model_list) + if external_data is not None: + skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) + + internal_data = self._get_internal_data(station, path) + if internal_data is not None: + skill_score_climatological[station] = skill_score.climatological_skill_scores( + internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) return skill_score_competitive, skill_score_climatological diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index f696b006..c40b20e2 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -269,8 +269,9 @@ class PreProcessing(RunEnvironment): output = [ pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) for station in set_stations] - for p in output: + for i, p in enumerate(output): dh, s = p.get() + logging.info(f"...finished: {s} ({int((i + 1.) / len(output) * 100)}%)") if dh is not None: collection.add(dh) valid_stations.append(s) @@ -352,8 +353,7 @@ def f_proc(data_handler, station, name_affix, store, **kwargs): the station that was used. This function must be implemented globally to work together with multiprocessing. """ try: - res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, - **kwargs) + res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs) except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError) as e: logging.info(f"remove station {station} because it raised an error: {e}") res = None -- GitLab