diff --git a/mlair/data_handler/abstract_data_handler.py b/mlair/data_handler/abstract_data_handler.py index dd7bafc676ab7b8781b81300d465e141a875c895..aafdb80c3455d2f659e5a952d81c1a0b841eea2e 100644 --- a/mlair/data_handler/abstract_data_handler.py +++ b/mlair/data_handler/abstract_data_handler.py @@ -99,6 +99,9 @@ class AbstractDataHandler(object): """Return coordinates as dictionary with keys `lon` and `lat`.""" return None + def get_wind_upstream_sector_by_name(self): + raise NotImplementedError + def _hash_list(self): return [] diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 19a4893d49c7702cd092858c5c885453e974cbc1..33dc05b6612b436665efb3b3960aed07a00faf52 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -10,6 +10,7 @@ import xarray as xr import pandas as pd from typing import Union, Tuple, Dict, List import itertools +import dask.array as da Data = Union[xr.DataArray, pd.DataFrame] @@ -202,7 +203,12 @@ def log_apply(data: Data, mean: Data, std: Data) -> Data: def mean_squared_error(a, b, dim=None): """Calculate mean squared error.""" - return np.square(a - b).mean(dim) + try: + return da.square(a - b).mean(dim, skipna=True) + except TypeError: + return da.square(a - b).mean(dim) + except Exception: + return np.square(a - b).mean(dim) def mean_absolute_error(a, b, dim=None): @@ -218,6 +224,16 @@ def calculate_error_metrics(a, b, dim): n = (a - b).notnull().sum(dim) return {"mse": mse, "rmse": rmse, "mae": mae, "n": n} +def skill_score_based_on_mse(data: xr.DataArray, obs_name: str, pred_name: str, ref_name: str, + aggregation_dim: str = "index", competitor_dim: str = "type") -> xr.DataArray: + obs = data.sel({competitor_dim: obs_name}) + pred = data.sel({competitor_dim: pred_name}) + ref = data.sel({competitor_dim: ref_name}) + ss = 1 - mean_squared_error(obs, pred, dim=aggregation_dim) / mean_squared_error(obs, ref, dim=aggregation_dim) + return ss + + + class SkillScores: r""" diff --git a/mlair/plotting/postprocessing_plotting.py b/mlair/plotting/postprocessing_plotting.py index f975c72f4542f6dac560b7fa4f564974f3615fba..1e33d34dfa1d2b437ca6fe1bbf2f38aabb2c202e 100644 --- a/mlair/plotting/postprocessing_plotting.py +++ b/mlair/plotting/postprocessing_plotting.py @@ -7,7 +7,7 @@ import math import os import sys import warnings -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union import matplotlib import matplotlib.pyplot as plt @@ -511,20 +511,13 @@ class PlotCompetitiveSkillScore(AbstractPlotClass): # pragma: no cover #<<<<<<< HEAD def __init__(self, data: pd.DataFrame, plot_folder=".", model_setup="NN", sampling="daily", model_name_for_plots=None): -#======= -# def __init__(self, data: Dict[str, pd.DataFrame], plot_folder=".", model_setup="NN"): -#>>>>>>> develop """Initialise.""" super().__init__(plot_folder, f"skill_score_competitive_{model_setup}") self._model_setup = model_setup self._sampling = self._get_sampling(sampling) self._labels = None -#<<<<<<< HEAD self._model_name_for_plots = model_name_for_plots self._data = self._prepare_data(data) -#======= -# self._data = self._prepare_data(helpers.remove_items(data, "total")) -#>>>>>>> develop default_plot_name = self.plot_name # draw full detail plot self.plot_name = default_plot_name + "_full_detail" @@ -621,6 +614,72 @@ class PlotCompetitiveSkillScore(AbstractPlotClass): # pragma: no cover return lower, upper +class PlotSectorialSkillScore(AbstractPlotClass): # pragma: no cover + + def __init__(self, data: xr.DataArray, plot_folder: str = ".", model_setup: str = "NN", sampling: str = "daily", + model_name_for_plots: Union[str, None] = None, ahead_dim: str = "ahead"): + """Initialise.""" + super().__init__(plot_folder, f"skill_score_sectorial_{model_setup}") + self._model_setup = model_setup + self._sampling = self._get_sampling(sampling) + self._ahead_dim = ahead_dim + self._labels = None + self._model_name_for_plots = model_name_for_plots + self._data = self._prepare_data(data) + self._plot() + self._save() + self.plot_name = self.plot_name + "_vertical" + self._plot_vertical() + self._save() + + def _prepare_data(self, data: xr.DataArray): + self._labels = [str(i) + self._sampling for i in data.coords[self._ahead_dim].values] + data = data.to_dataframe("data")[["data"]].stack(level=0).reset_index(level=3, drop=True).reset_index(name="data") + return data + + def _plot(self): + size = max([len(np.unique(self._data.sector)), 6]) + fig, ax = plt.subplots(figsize=(size, size * 0.8)) + data = self._data + sns.boxplot(x="sector", y="data", hue="ahead", data=data, whis=1.5, ax=ax, palette="Blues_d", + showmeans=True, meanprops={"markersize": 3, "markeredgecolor": "k"}, flierprops={"marker": "."}, + ) + ax.axhline(y=0, color="grey", linewidth=.5) + ax.set(ylabel="skill score", xlabel="sector", title="summary of all stations") + handles, _ = ax.get_legend_handles_labels() + plt.xticks(rotation=45, horizontalalignment="right") + ax.legend(handles, self._labels) + plt.tight_layout() + + def _plot_vertical(self): + """Plot skill scores of the comparisons, but vertically aligned.""" + fig, ax = plt.subplots() + data = self._data + sns.boxplot(y="sector", x="data", hue="ahead", data=data, whis=1.5, ax=ax, palette="Blues_d", + showmeans=True, meanprops={"markersize": 3, "markeredgecolor": "k"}, flierprops={"marker": "."}, + ) + ax.axvline(x=0, color="grey", linewidth=.5) + ax.set(xlabel="skill score", ylabel="sector", title="summary of all stations") + handles, _ = ax.get_legend_handles_labels() + ax.legend(handles, self._labels) + plt.tight_layout() + + @staticmethod + def _lim(data) -> Tuple[float, float]: + """ + Calculate axis limits from data (Can be used to set axis extend). + + Lower limit is the minimum of 0 and data's minimum (reduced by small subtrahend) and upper limit is data's + maximum (increased by a small addend). + + :return: + """ + limit = 5 + lower = np.max([-limit, np.min([0, helpers.float_round(data["data"].min(), 2) - 0.1])]) + upper = np.min([limit, helpers.float_round(data.max()[2], 2) + 0.1]) + return lower, upper + + @TimeTrackingWrapper class PlotFeatureImportanceSkillScore(AbstractPlotClass): # pragma: no cover """ diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 1f1d7e7c5d05d1e0e2fb0100e9468aaa2c551bc2..d0646de163484706a66387197260dc891353dcde 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -14,6 +14,7 @@ import tensorflow.keras as keras import numpy as np import pandas as pd import xarray as xr +import dask.array as da from mlair.configuration import path_config from mlair.data_handler import Bootstraps, KerasIterator @@ -23,7 +24,7 @@ from mlair.model_modules.linear_model import OrdinaryLeastSquaredModel from mlair.model_modules import AbstractModelClass from mlair.plotting.postprocessing_plotting import PlotMonthlySummary, PlotClimatologicalSkillScore, \ PlotCompetitiveSkillScore, PlotTimeSeries, PlotFeatureImportanceSkillScore, PlotConditionalQuantiles, \ - PlotSeparationOfScales, PlotSampleUncertaintyFromBootstrap + PlotSeparationOfScales, PlotSampleUncertaintyFromBootstrap, PlotSectorialSkillScore from mlair.plotting.data_insight_plotting import PlotStationMap, PlotAvailability, PlotAvailabilityHistogram, \ PlotPeriodogram, PlotDataHistogram from mlair.run_modules.run_environment import RunEnvironment @@ -85,6 +86,7 @@ class PostProcessing(RunEnvironment): self._sampling = self.data_store.get("sampling") self.window_lead_time = extract_value(self.data_store.get("output_shape", "model")) self.skill_scores = None + self.skill_score_per_sector = None self.feature_importance_skill_scores = None self.uncertainty_estimate = None self.competitor_path = self.data_store.get("competitor_path") @@ -100,6 +102,8 @@ class PostProcessing(RunEnvironment): self.uncertainty_estimate_boot_dim = "boots" self.model_type_dim = "type" self.index_dim = "index" + self.iter_dim = self.data_store.get("iter_dim") + self.upstream_wind_sector = None self.model_display_name = self.data_store.get_default("model_display_name", default=self.model.model_name) self._run() @@ -111,6 +115,13 @@ class PostProcessing(RunEnvironment): self.make_prediction(self.test_data) self.make_prediction(self.train_val_data) + # load upstream wind sector for test_data + try: + self.load_upstream_wind_sector(name_of_set="test") + self.skill_score_per_sector = self.calculate_error_metrics_based_on_upstream_wind_dir() + except Exception as e: + logging.info(f"Can not process upsstream wind sectors due to: {e}") + # calculate error metrics on test data self.calculate_test_score() @@ -140,6 +151,17 @@ class PostProcessing(RunEnvironment): # plotting self.plot() + def load_upstream_wind_sector(self, name_of_set): + path = os.path.join(self.data_store.get("experiment_path"), + f"data/*_{self.data_store.get('start', name_of_set)}_{self.data_store.get('end', name_of_set)}_upstream_wind_sector.nc") + iter_dim = self.data_store.get("iter_dim") + ds = xr.open_mfdataset(path).to_array(iter_dim) + try: + ds = ds.rename({'XTIME': self.index_dim}) + except ValueError as e: + logging.warning("Dimension `XTIME' does not exist") + self.upstream_wind_sector = ds + @TimeTrackingWrapper def estimate_sample_uncertainty(self, separate_ahead=False): """ @@ -537,6 +559,16 @@ class PostProcessing(RunEnvironment): logging.error(f"Could not create plot PlotCompetitiveSkillScore due to the following error: {e}" f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: + if "PlotSectorialSkillScore" in plot_list: + PlotSectorialSkillScore(self.skill_score_per_sector, plot_folder=self.plot_path, + model_setup=self.model_display_name, sampling=self._sampling, + model_name_for_plots=self.model_name_for_plots, ahead_dim=self.ahead_dim + ) + except Exception as e: + logging.error(f"Could not create plot PlotSectorialSkillScore due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: if "PlotTimeSeries" in plot_list: PlotTimeSeries(self.test_data.keys(), path, r"forecasts_%s_test.nc", plot_folder=self.plot_path, @@ -900,6 +932,45 @@ class PostProcessing(RunEnvironment): except (TypeError, AttributeError): return forecast if competitor is None else competitor + @TimeTrackingWrapper + def load_forecast_array_and_store_as_dataset(self): + path = self.data_store.get("forecast_path") + all_stations = self.data_store.get("stations") + for station in all_stations: + external_data = self._get_external_data(station, path) # test data + if external_data is not None: + external_data.coords[self.model_type_dim] = [ + {self.forecast_indicator: self.model_display_name}.get(n, n) + for n in external_data.coords[self.model_type_dim].values] + external_data_expd = external_data.assign_coords({self.iter_dim: station}) + external_data_expd = external_data_expd.expand_dims(self.iter_dim).to_dataset(self.iter_dim) + external_data_expd.to_netcdf(os.path.join(path, f"forecasts_ds_{str(station)}_test.nc")) + + def calculate_error_metrics_based_on_upstream_wind_dir(self): + self.load_forecast_array_and_store_as_dataset() + path = self.data_store.get("forecast_path") + files = os.path.join(path, "forecasts_ds_*_test.nc") + ds = xr.open_mfdataset(files) + ds = ds.to_array(self.iter_dim) + wind_sectors = self.data_store.get("wind_sectors", "general") + sector_collector = dict() + h_sector_skill_scores = [] + for sec in wind_sectors: + h_sector_skill_scores.append(statistics.skill_score_based_on_mse( + ds.where(self.upstream_wind_sector.squeeze() == sec), + obs_name=self.observation_indicator, pred_name=self.model_display_name, + ref_name="ols").assign_coords({"sector": sec}) + ) + + #sec_coords = da.argwhere(self.upstream_wind_sector.squeeze() == sec) + #sector_collector[sec] = dict() + #for i, dim in enumerate(self.upstream_wind_sector.squeeze().dims): + # sector_collector[sec][dim] = sec_coords[:,i] + sector_skill_scores = xr.concat(h_sector_skill_scores, dim="sector") + #.to_dataframe("data")[["data"]].stack(level=0).reset_index(level=2, drop=True).reset_index(name="data") + return sector_skill_scores + + def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict, Dict]: """ Calculate error metrics and skill scores of NN forecast.