Select Git revision
FrameElement.cpp
post_processing.py NaN GiB
"""Post-processing module."""
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-11'
import inspect
import logging
import os
import sys
import traceback
import copy
from typing import Dict, Tuple, Union, List, Callable
import numpy as np
import pandas as pd
import xarray as xr
import datetime as dt
from mlair.configuration import path_config
from mlair.data_handler import Bootstraps, KerasIterator
from mlair.helpers.datastore import NameNotFoundInDataStore, NameNotFoundInScope
from mlair.helpers import TimeTracking, TimeTrackingWrapper, statistics, extract_value, remove_items, to_list, tables
from mlair.model_modules.linear_model import OrdinaryLeastSquaredModel
from mlair.model_modules import AbstractModelClass
from mlair.plotting.postprocessing_plotting import PlotMonthlySummary, PlotClimatologicalSkillScore, \
PlotCompetitiveSkillScore, PlotTimeSeries, PlotFeatureImportanceSkillScore, PlotConditionalQuantiles, \
PlotSeparationOfScales, PlotSampleUncertaintyFromBootstrap, PlotTimeEvolutionMetric
from mlair.plotting.data_insight_plotting import PlotStationMap, PlotAvailability, PlotAvailabilityHistogram, \
PlotPeriodogram, PlotDataHistogram
from mlair.run_modules.run_environment import RunEnvironment
class PostProcessing(RunEnvironment):
"""
Perform post-processing for performance evaluation.
Schedule of post-processing:
#. train an ordinary least squared model (ols) for reference
#. create forecasts for nn, ols, and persistence
#. evaluate feature importance with bootstrapped predictions
#. calculate skill scores
#. create plots
Required objects [scope] from data store:
* `model` [.] or locally saved model plus `model_name` [model] and `model` [model]
* `generator` [train, val, test, train_val]
* `forecast_path` [.]
* `plot_path` [postprocessing]
* `model_path` [.]
* `target_var` [.]
* `sampling` [.]
* `output_shape` [model]
* `evaluate_feature_importance` [postprocessing] and if enabled:
* `create_new_bootstraps` [postprocessing]
* `bootstrap_path` [postprocessing]
* `number_of_bootstraps` [postprocessing]
Optional objects
* `batch_size` [model]
Creates
* forecasts in `forecast_path` if enabled
* bootstraps in `bootstrap_path` if enabled
* plots in `plot_path`
"""
def __init__(self):
"""Initialise and run post-processing."""
super().__init__()
self.model: AbstractModelClass = self._load_model()
self.model_name = self.data_store.get("model_name", "model").rsplit("/", 1)[1].split(".", 1)[0]
self.ols_model = None
self.batch_size: int = self.data_store.get_default("batch_size", "model", 64)
self.test_data = self.data_store.get("data_collection", "test")
batch_path = self.data_store.get("batch_path", scope="test")
self.test_data_distributed = KerasIterator(self.test_data, self.batch_size, model=self.model, name="test",
batch_path=batch_path)
self.train_data = self.data_store.get("data_collection", "train")
self.val_data = self.data_store.get("data_collection", "val")
self.train_val_data = self.data_store.get("data_collection", "train_val")
self.forecast_path = self.data_store.get("forecast_path")
self.plot_path: str = self.data_store.get("plot_path")
self.target_var = self.data_store.get("target_var")
self._sampling = self.data_store.get("sampling")
self.window_lead_time = extract_value(self.data_store.get("output_shape", "model"))
self.skill_scores = None
self.feature_importance_skill_scores = None
self.uncertainty_estimate = None
self.uncertainty_estimate_seasons = {}
self.block_mse_per_station = None
self.competitor_path = self.data_store.get("competitor_path")
self.competitors = to_list(self.data_store.get_default("competitors", default=[]))
self.forecast_indicator = "nn"
self.observation_indicator = "obs"
self.ahead_dim = "ahead"
self.boot_var_dim = "boot_var"
self.uncertainty_estimate_boot_dim = "boots"
self.model_type_dim = "type"
self.index_dim = "index"
self.model_display_name = self.data_store.get_default("model_display_name", default=self.model.model_name)
self._run()
def _run(self):
# ols model
self.train_ols_model()
# forecasts on test data
self.make_prediction(self.test_data)
self.make_prediction(self.train_val_data)
# calculate error metrics on test data
self.calculate_test_score()
# sample uncertainty
if self.data_store.get("do_uncertainty_estimate", "postprocessing"):
self.estimate_sample_uncertainty(separate_ahead=True)
# feature importance bootstraps
if self.data_store.get("evaluate_feature_importance", "postprocessing"):
with TimeTracking(name="evaluate_feature_importance", log_on_enter=True):
create_new_bootstraps = self.data_store.get("create_new_bootstraps", "feature_importance")
bootstrap_method = self.data_store.get("bootstrap_method", "feature_importance")
bootstrap_type = self.data_store.get("bootstrap_type", "feature_importance")
self.calculate_feature_importance(create_new_bootstraps, bootstrap_type=bootstrap_type,
bootstrap_method=bootstrap_method)
if self.feature_importance_skill_scores is not None:
self.report_feature_importance_results(self.feature_importance_skill_scores)
# skill scores and error metrics
with TimeTracking(name="calculate_error_metrics", log_on_enter=True):
skill_score_competitive, _, skill_score_climatological, errors = self.calculate_error_metrics()
self.skill_scores = (skill_score_competitive, skill_score_climatological)
with TimeTracking(name="report_error_metrics", log_on_enter=True):
self.report_error_metrics(errors)
self.report_error_metrics({self.forecast_indicator: skill_score_climatological})
self.report_error_metrics({"skill_score": skill_score_competitive})
# plotting
self.plot()
@TimeTrackingWrapper
def estimate_sample_uncertainty(self, separate_ahead=False):
"""
Estimate sample uncertainty by using a bootstrap approach. Forecasts are split into individual blocks along time
and randomly drawn with replacement. The resulting behaviour of the error indicates the robustness of each
analyzed model to quantify which model might be superior compared to others.
"""
logging.info("start estimate_sample_uncertainty")
n_boots = self.data_store.get_default("n_boots", default=1000, scope="uncertainty_estimate")
block_length = self.data_store.get_default("block_length", default="1m", scope="uncertainty_estimate")
evaluate_competitors = self.data_store.get_default("evaluate_competitors", default=True,
scope="uncertainty_estimate")
block_mse, block_mse_per_station = self.calculate_block_mse(evaluate_competitors=evaluate_competitors,
separate_ahead=separate_ahead,
block_length=block_length)
self.block_mse_per_station = block_mse_per_station
estimate = statistics.create_n_bootstrap_realizations(
block_mse, dim_name_time=self.index_dim, dim_name_model=self.model_type_dim,
dim_name_boots=self.uncertainty_estimate_boot_dim, n_boots=n_boots, seasons=["DJF", "MAM", "JJA", "SON"])
self.uncertainty_estimate = estimate.pop("")
self.uncertainty_estimate_seasons = estimate
self.report_sample_uncertainty()
def report_sample_uncertainty(self, percentiles: list = None):
"""
Store raw results of uncertainty estimate and calculate aggregate statistics and store as raw data but also as
markdown and latex.
"""
report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report")
path_config.check_path_and_create(report_path)
# store raw results as nc
file_name = os.path.join(report_path, "uncertainty_estimate_raw_results.nc")
self.uncertainty_estimate.to_netcdf(path=file_name)
for season in self.uncertainty_estimate_seasons.keys():
file_name = os.path.join(report_path, f"uncertainty_estimate_raw_results_{season}.nc")
self.uncertainty_estimate_seasons[season].to_netcdf(path=file_name)
# store statistics
if percentiles is None:
percentiles = [.05, .1, .25, .5, .75, .9, .95]
for season in [None] + list(self.uncertainty_estimate_seasons.keys()):
estimate = self.uncertainty_estimate if season is None else self.uncertainty_estimate_seasons[season]
affix = "" if season is None else f"_{season}"
for ahead_steps in ["single", "multi"]:
if ahead_steps == "single":
try:
df_descr = estimate.to_pandas().describe(percentiles=percentiles).astype("float32")
except ValueError:
df_descr = estimate.mean(self.ahead_dim).to_pandas().describe(percentiles=percentiles).astype("float32")
else:
if self.ahead_dim not in estimate.dims:
continue
df_descr = estimate.to_dataframe(self.model_type_dim).unstack().groupby(level=self.ahead_dim).describe(
percentiles=percentiles).astype("float32")
df_descr = df_descr.stack(-1)
df_descr = df_descr.reorder_levels(df_descr.index.names[::-1])
df_sorter = ["count", "mean", "std", "min", *[f"{round(p * 100)}%" for p in percentiles], "max"]
df_descr = df_descr.loc[df_sorter]
column_format = tables.create_column_format_for_tex(df_descr)
file_name = os.path.join(report_path, f"uncertainty_estimate_statistics_{ahead_steps}{affix}.%s")
tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df_descr)
tables.save_to_md(report_path, file_name % "md", df=df_descr)
df_descr.to_csv(file_name % "csv", sep=";")
def calculate_block_mse(self, evaluate_competitors=True, separate_ahead=False, block_length="1m"):
"""
Transform data into blocks along time axis. Block length can be any frequency like '1m' or '7d. Data are only
split along time axis, which means that a single block can have very diverse quantities regarding the number of
station or actual data contained. This is intended to analyze not only the robustness against the time but also
against the number of observations and diversity ot stations.
"""
all_stations = self.data_store.get("stations", "test")
start = self.data_store.get("start", "test")
end = self.data_store.get("end", "test")
index_dim = self.index_dim
coll_dim = "station"
collector = []
for station in all_stations:
# test data
external_data = self._get_external_data(station, self.forecast_path)
if external_data is None:
logging.info(f"skip calculate_block_mse for {station} as no external_data are available")
continue
# competitors
if evaluate_competitors is True:
competitor = self.load_competitors(station)
combined = self._combine_forecasts(external_data, competitor, dim=self.model_type_dim)
else:
combined = external_data
if combined is None:
continue
else:
combined = self.create_full_time_dim(combined, index_dim, self._sampling, start, end)
# get squared errors
errors = self.create_error_array(combined)
# calc mse for each block (single station)
mse = errors.resample(indexer={index_dim: block_length}).mean(skipna=True)
collector.append(mse.assign_coords({coll_dim: station}))
# combine all mse blocks
mse_blocks_per_station = xr.concat(collector, dim=coll_dim)
# calc mse for each block (average over all stations)
mse_blocks = mse_blocks_per_station.mean(dim=coll_dim, skipna=True)
# average also on ahead steps
if separate_ahead is False:
mse_blocks = mse_blocks.mean(dim=self.ahead_dim, skipna=True)
mse_blocks_per_station = mse_blocks_per_station.mean(dim=self.ahead_dim, skipna=True)
return mse_blocks, mse_blocks_per_station
def create_error_array(self, data):
"""Calculate squared error of all given time series in relation to observation."""
errors = data.drop_sel({self.model_type_dim: self.observation_indicator})
errors1 = errors - data.sel({self.model_type_dim: self.observation_indicator})
errors2 = errors1 ** 2
return errors2
@staticmethod
def create_full_time_dim(data, dim, sampling, start, end):
"""Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped."""
start_data = data.coords[dim].values[0]
freq = {"daily": "1D", "hourly": "1H"}.get(sampling)
_ind = pd.date_range(start, end, freq=freq) # two steps required to include all hours of end interval
datetime_index = pd.DataFrame(index=pd.date_range(_ind.min(), _ind.max() + dt.timedelta(days=1),
closed="left", freq=freq))
t = data.sel({dim: start_data}, drop=True)
res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords])
res = res.transpose(*data.dims)
if data.shape == res.shape:
res.loc[data.coords] = data
else:
_d = data.sel({dim: slice(start, end)})
res.loc[_d.coords] = _d
return res
def load_competitors(self, station_name: str) -> xr.DataArray:
"""
Load all requested and available competitors for a given station. Forecasts must be available in the competitor
path like `<competitor_path>/<target_var>/forecasts_<station_name>_test.nc`. The naming style is equal for all
forecasts of MLAir, so that forecasts of a different experiment can easily be copied into the competitor path
without any change.
:param station_name: station indicator to load competitors for
:return: a single xarray with all competing forecasts
"""
competing_predictions = []
for competitor_name in self.competitors:
try:
prediction = self._create_competitor_forecast(station_name, competitor_name)
competing_predictions.append(prediction)
except (FileNotFoundError, KeyError):
logging.debug(f"No competitor found for combination '{station_name}' and '{competitor_name}'.")
continue
return xr.concat(competing_predictions, self.model_type_dim) if len(competing_predictions) > 0 else None
def calculate_feature_importance(self, create_new_bootstraps: bool, _iter: int = 0, bootstrap_type="singleinput",
bootstrap_method="shuffle") -> None:
"""
Calculate skill scores of bootstrapped data.
Create bootstrapped data if create_new_bootstraps is true or a failure occurred during skill score calculation
(this will happen by default, if no bootstrapped data is available locally). Set class attribute
bootstrap_skill_scores. This method is implemented in a recursive fashion, but is only allowed to call itself
once.
:param create_new_bootstraps: calculate all bootstrap predictions and overwrite already available predictions
:param _iter: internal counter to reduce unnecessary recursive calls (maximum number is 2, otherwise something
went wrong).
"""
if _iter == 0:
self.feature_importance_skill_scores = {}
for boot_type in to_list(bootstrap_type):
if _iter == 0:
self.feature_importance_skill_scores[boot_type] = {}
for boot_method in to_list(bootstrap_method):
try:
if create_new_bootstraps:
self.create_feature_importance_bootstrap_forecast(bootstrap_type=boot_type,
bootstrap_method=boot_method)
boot_skill_score = self.calculate_feature_importance_skill_scores(bootstrap_type=boot_type,
bootstrap_method=boot_method)
self.feature_importance_skill_scores[boot_type][boot_method] = boot_skill_score
except (FileNotFoundError, ValueError, OSError):
if _iter != 0:
raise RuntimeError(f"calculate_feature_importance ({boot_type}, {boot_method}) was called for "
f"the 2nd time. This means, that something internally goes wrong. Please "
f"check for possible errors.")
logging.info(f"Could not load all files for feature importance ({boot_type}, {boot_method}), "
f"restart calculate_feature_importance with create_new_bootstraps=True.")
self.calculate_feature_importance(True, _iter=1, bootstrap_type=boot_type,
bootstrap_method=boot_method)
def create_feature_importance_bootstrap_forecast(self, bootstrap_type, bootstrap_method) -> None:
"""
Create bootstrapped predictions for all stations and variables.
These forecasts are saved in bootstrap_path with the names `bootstraps_{var}_{station}.nc` and
`bootstraps_labels_{station}.nc`.
"""
def _reshape(d, pos):
if isinstance(d, list):
return list(map(lambda x: _reshape(x, pos), d))
else:
return d[..., pos]
# forecast
with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})",
log_on_enter=True):
# extract all requirements from data store
number_of_bootstraps = self.data_store.get("n_boots", "feature_importance")
dims = [self.uncertainty_estimate_boot_dim, self.index_dim, self.ahead_dim, self.model_type_dim]
for station in self.test_data:
X, Y = None, None
bootstraps = Bootstraps(station, number_of_bootstraps, bootstrap_type=bootstrap_type,
bootstrap_method=bootstrap_method)
number_of_bootstraps = bootstraps.number_of_bootstraps
for boot in bootstraps:
X, Y, (index, dimension) = boot
# make bootstrap predictions
bootstrap_predictions = [self.model.predict(_reshape(X, pos)) for pos in range(number_of_bootstraps)]
if isinstance(bootstrap_predictions[0], list): # if model is branched model
bootstrap_predictions = list(map(lambda x: x[-1], bootstrap_predictions))
# save bootstrap predictions separately for each station and variable combination
bootstrap_predictions = list(map(lambda x: np.expand_dims(x, axis=-1), bootstrap_predictions))
shape = bootstrap_predictions[0].shape
coords = (range(number_of_bootstraps), range(shape[0]), range(1, shape[1] + 1))
var = f"{index}_{dimension}" if index is not None else str(dimension)
tmp = xr.DataArray(bootstrap_predictions, coords=(*coords, [var]), dims=dims)
file_name = os.path.join(self.forecast_path,
f"bootstraps_{station}_{var}_{bootstrap_type}_{bootstrap_method}.nc")
tmp.to_netcdf(file_name)
else:
# store also true labels for each station
labels = np.expand_dims(Y[..., 0], axis=-1)
file_name = os.path.join(self.forecast_path, f"bootstraps_{station}_{bootstrap_method}_labels.nc")
labels = xr.DataArray(labels, coords=(*coords[1:], [self.observation_indicator]), dims=dims[1:])
labels.to_netcdf(file_name)
def calculate_feature_importance_skill_scores(self, bootstrap_type, bootstrap_method) -> Dict[str, xr.DataArray]:
"""
Calculate skill score of bootstrapped variables.
Use already created bootstrap predictions and the original predictions (the not-bootstrapped ones) and calculate
skill scores for the bootstraps. The result is saved as a xarray DataArray in a dictionary structure separated
for each station (keys of dictionary).
:return: The result dictionary with station-wise skill scores
"""
with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})"):
# extract all requirements from data store
number_of_bootstraps = self.data_store.get("n_boots", "feature_importance")
forecast_file = f"forecasts_norm_%s_test.nc"
reference_name = "orig"
branch_names = self.data_store.get_default("branch_names", None)
bootstraps = Bootstraps(self.test_data[0], number_of_bootstraps, bootstrap_type=bootstrap_type,
bootstrap_method=bootstrap_method)
number_of_bootstraps = bootstraps.number_of_bootstraps
bootstrap_iter = bootstraps.bootstraps()
branch_length = self.get_distinct_branches_from_bootstrap_iter(bootstrap_iter)
skill_scores = statistics.SkillScores(None, ahead_dim=self.ahead_dim, type_dim=self.model_type_dim,
index_dim=self.index_dim, observation_name=self.observation_indicator)
score = {}
for station in self.test_data:
# get station labels
file_name = os.path.join(self.forecast_path, f"bootstraps_{str(station)}_{bootstrap_method}_labels.nc")
with xr.open_dataarray(file_name) as da:
labels = da.load()
# get original forecasts
orig = self.get_orig_prediction(self.forecast_path, forecast_file % str(station),
reference_name=reference_name)
orig.coords[self.index_dim] = labels.coords[self.index_dim]
# calculate skill scores for each variable
skill = []
for boot_set in bootstrap_iter:
boot_var = f"{boot_set[0]}_{boot_set[1]}" if isinstance(boot_set, tuple) else str(boot_set)
file_name = os.path.join(self.forecast_path,
f"bootstraps_{station}_{boot_var}_{bootstrap_type}_{bootstrap_method}.nc")
with xr.open_dataarray(file_name) as da:
boot_data = da.load()
boot_data = boot_data.combine_first(labels).combine_first(orig)
boot_scores = []
for ahead in range(1, self.window_lead_time + 1):
data = boot_data.sel({self.ahead_dim: ahead})
boot_scores.append(
skill_scores.general_skill_score(data, forecast_name=boot_var,
reference_name=reference_name, dim=self.index_dim))
boot_var_renamed = self.rename_boot_var_with_branch(boot_var, bootstrap_type, branch_names, expected_len=branch_length)
tmp = xr.DataArray(np.expand_dims(np.array(boot_scores), axis=-1),
coords={self.ahead_dim: range(1, self.window_lead_time + 1),
self.uncertainty_estimate_boot_dim: range(number_of_bootstraps),
self.boot_var_dim: [boot_var_renamed]},
dims=[self.ahead_dim, self.uncertainty_estimate_boot_dim, self.boot_var_dim])
skill.append(tmp)
# collect all results in single dictionary
score[str(station)] = xr.concat(skill, dim=self.boot_var_dim)
return score
@staticmethod
def get_distinct_branches_from_bootstrap_iter(bootstrap_iter):
if isinstance(bootstrap_iter[0], tuple):
return len(set(map(lambda x: x[0], bootstrap_iter)))
else:
return len(bootstrap_iter)
def rename_boot_var_with_branch(self, boot_var, bootstrap_type, branch_names=None, expected_len=0):
if branch_names is None:
return boot_var
if bootstrap_type == "branch":
try:
assert len(branch_names) > int(boot_var)
assert len(branch_names) == expected_len
return branch_names[int(boot_var)]
except (AssertionError, TypeError):
return boot_var
elif bootstrap_type == "singleinput":
if "_" in boot_var:
branch, other = boot_var.split("_", 1)
branch = self.rename_boot_var_with_branch(branch, "branch", branch_names=branch_names, expected_len=expected_len)
boot_var = "_".join([branch, other])
return boot_var
return boot_var
def get_orig_prediction(self, path, file_name, prediction_name=None, reference_name=None):
if prediction_name is None:
prediction_name = self.forecast_indicator
file = os.path.join(path, file_name)
with xr.open_dataarray(file) as da:
prediction = da.load().sel({self.model_type_dim: [prediction_name]})
if reference_name is not None:
prediction.coords[self.model_type_dim] = [reference_name]
return prediction.dropna(dim=self.index_dim)
@staticmethod
def repeat_data(data, number_of_repetition):
if isinstance(data, xr.DataArray):
data = data.data
return np.repeat(np.expand_dims(data, axis=-1), number_of_repetition, axis=-1)
def _get_model_name(self):
"""Return model name without path information."""
return self.data_store.get("model_name", "model").rsplit("/", 1)[1].split(".", 1)[0]
def _load_model(self) -> AbstractModelClass:
"""
Load NN model either from data store or from local path.
:return: the model
"""
try: # is only available if a model was trained in training stage
model = self.data_store.get("model")
except (NameNotFoundInDataStore, NameNotFoundInScope):
logging.info("No model was saved in data store. Try to load model from experiment path.")
model_name = self.data_store.get("model_name", "model")
model: AbstractModelClass = self.data_store.get("model", "model")
model.load_model(model_name)
return model
# noinspection PyBroadException
def plot(self):
"""
Create all plots.
Plots are defined in experiment set up by `plot_list`. As default, all (following) plots are enabled:
* :py:class:`PlotBootstrapSkillScore <src.plotting.postprocessing_plotting.PlotBootstrapSkillScore>`
* :py:class:`PlotConditionalQuantiles <src.plotting.postprocessing_plotting.PlotConditionalQuantiles>`
* :py:class:`PlotStationMap <src.plotting.postprocessing_plotting.PlotStationMap>`
* :py:class:`PlotMonthlySummary <src.plotting.postprocessing_plotting.PlotMonthlySummary>`
* :py:class:`PlotClimatologicalSkillScore <src.plotting.postprocessing_plotting.PlotClimatologicalSkillScore>`
* :py:class:`PlotCompetitiveSkillScore <src.plotting.postprocessing_plotting.PlotCompetitiveSkillScore>`
* :py:class:`PlotTimeSeries <src.plotting.postprocessing_plotting.PlotTimeSeries>`
* :py:class:`PlotAvailability <src.plotting.postprocessing_plotting.PlotAvailability>`
.. note:: Bootstrap plots are only created if bootstraps are evaluated.
"""
logging.info("Run plotting routines...")
use_multiprocessing = self.data_store.get("use_multiprocessing")
plot_list = self.data_store.get("plot_list", "postprocessing")
time_dim = self.data_store.get("time_dim")
window_dim = self.data_store.get("window_dim")
target_dim = self.data_store.get("target_dim")
iter_dim = self.data_store.get("iter_dim")
try:
if ("filter" in self.test_data[0].get_X(as_numpy=False)[0].coords) and (
"PlotSeparationOfScales" in plot_list):
filter_dim = self.data_store.get_default("filter_dim", None)
PlotSeparationOfScales(self.test_data, plot_folder=self.plot_path, time_dim=time_dim,
window_dim=window_dim, target_dim=target_dim, **{"filter_dim": filter_dim})
except Exception as e:
logging.error(f"Could not create plot PlotSeparationOfScales due to the following error:"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if (self.feature_importance_skill_scores is not None) and ("PlotFeatureImportanceSkillScore" in plot_list):
branch_names = self.data_store.get_default("branch_names", None)
for boot_type, boot_data in self.feature_importance_skill_scores.items():
for boot_method, boot_skill_score in boot_data.items():
try:
PlotFeatureImportanceSkillScore(
boot_skill_score, plot_folder=self.plot_path, model_name=self.model_display_name,
sampling=self._sampling, ahead_dim=self.ahead_dim,
separate_vars=to_list(self.target_var), bootstrap_type=boot_type,
bootstrap_method=boot_method, branch_names=branch_names)
except Exception as e:
logging.error(f"Could not create plot PlotFeatureImportanceSkillScore ({boot_type}, "
f"{boot_method}) due to the following error:\n{sys.exc_info()[0]}\n"
f"{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
except Exception as e:
logging.error(f"Could not create plot PlotFeatureImportanceSkillScore due to the following error: {e}")
try:
if "PlotConditionalQuantiles" in plot_list:
PlotConditionalQuantiles(self.test_data.keys(), data_pred_path=self.forecast_path,
plot_folder=self.plot_path, forecast_indicator=self.forecast_indicator,
obs_indicator=self.observation_indicator)
except Exception as e:
logging.error(f"Could not create plot PlotConditionalQuantiles due to the following error:"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotMonthlySummary" in plot_list:
PlotMonthlySummary(self.test_data.keys(), self.forecast_path, r"forecasts_%s_test.nc", self.target_var,
plot_folder=self.plot_path)
except Exception as e:
logging.error(f"Could not create plot PlotMonthlySummary due to the following error:"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotClimatologicalSkillScore" in plot_list:
PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path,
model_name=self.model_display_name)
PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, score_only=False,
extra_name_tag="all_terms_", model_name=self.model_display_name)
except Exception as e:
logging.error(f"Could not create plot PlotClimatologicalSkillScore due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotCompetitiveSkillScore" in plot_list:
PlotCompetitiveSkillScore(self.skill_scores[0], plot_folder=self.plot_path,
model_setup=self.model_display_name)
except Exception as e:
logging.error(f"Could not create plot PlotCompetitiveSkillScore due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotTimeSeries" in plot_list:
PlotTimeSeries(self.test_data.keys(), self.forecast_path, r"forecasts_%s_test.nc",
plot_folder=self.plot_path, sampling=self._sampling, ahead_dim=self.ahead_dim)
except Exception as e:
logging.error(f"Could not create plot PlotTimeSeries due to the following error:\n{sys.exc_info()[0]}\n"
f"{sys.exc_info()[1]}\n{sys.exc_info()[2]}\n{traceback.format_exc()}")
try:
if "PlotSampleUncertaintyFromBootstrap" in plot_list and self.uncertainty_estimate is not None:
block_length = self.data_store.get_default("block_length", default="1m", scope="uncertainty_estimate")
for season in [None] + list(self.uncertainty_estimate_seasons.keys()):
estimate = self.uncertainty_estimate if season is None else self.uncertainty_estimate_seasons[season]
PlotSampleUncertaintyFromBootstrap(
data=estimate, plot_folder=self.plot_path, model_type_dim=self.model_type_dim,
dim_name_boots=self.uncertainty_estimate_boot_dim, error_measure="mean squared error",
error_unit=r"ppb$^2$", block_length=block_length, model_name=self.model_display_name,
model_indicator=self.forecast_indicator, sampling=self._sampling, season_annotation=season)
except Exception as e:
logging.error(f"Could not create plot PlotSampleUncertaintyFromBootstrap due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotStationMap" in plot_list:
gens = [(self.train_data, {"marker": 5, "ms": 9}),
(self.val_data, {"marker": 6, "ms": 9}),
(self.test_data, {"marker": 4, "ms": 9})]
PlotStationMap(generators=gens, plot_folder=self.plot_path)
gens = [(self.train_val_data, {"marker": 8, "ms": 9}),
(self.test_data, {"marker": 9, "ms": 9})]
PlotStationMap(generators=gens, plot_folder=self.plot_path, plot_name="station_map_var")
except Exception as e:
if self.data_store.get("hostname")[:2] in self.data_store.get("hpc_hosts") or self.data_store.get("hostname")[:6] in self.data_store.get("hpc_hosts"):
logging.info(f"PlotStationMap might have failed as current workflow is running on hpc node {self.data_store.get('hostname')}. To download geographic elements, please run PlotStationMap once on login node.")
logging.error(f"Could not create plot PlotStationMap due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotAvailability" in plot_list:
avail_data = {"train": self.train_data, "val": self.val_data, "test": self.test_data}
PlotAvailability(avail_data, plot_folder=self.plot_path, time_dimension=time_dim,
window_dimension=window_dim)
except Exception as e:
logging.error(f"Could not create plot PlotAvailability due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotAvailabilityHistogram" in plot_list:
avail_data = {"train": self.train_data, "val": self.val_data, "test": self.test_data}
PlotAvailabilityHistogram(avail_data, plot_folder=self.plot_path, station_dim=iter_dim,
history_dim=window_dim)
except Exception as e:
logging.error(f"Could not create plot PlotAvailabilityHistogram due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotDataHistogram" in plot_list:
upsampling = self.data_store.get_default("upsampling", scope="train", default=False)
gens = {"train": self.train_data, "val": self.val_data, "test": self.test_data}
PlotDataHistogram(gens, plot_folder=self.plot_path, time_dim=time_dim, variables_dim=target_dim,
upsampling=upsampling)
except Exception as e:
logging.error(f"Could not create plot PlotDataHistogram due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotPeriodogram" in plot_list:
PlotPeriodogram(self.train_data, plot_folder=self.plot_path, time_dim=time_dim,
variables_dim=target_dim, sampling=self._sampling,
use_multiprocessing=use_multiprocessing)
except Exception as e:
logging.error(f"Could not create plot PlotPeriodogram due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
try:
if "PlotTimeEvolutionMetric" in plot_list:
PlotTimeEvolutionMetric(self.block_mse_per_station, plot_folder=self.plot_path,
model_type_dim=self.model_type_dim, ahead_dim=self.ahead_dim,
error_measure="Mean Squared Error", error_unit=r"ppb$^2$",
model_indicator=self.forecast_indicator, model_name=self.model_display_name,
time_dim=self.index_dim)
except Exception as e:
logging.error(f"Could not create plot PlotTimeEvolutionMetric due to the following error: {e}"
f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}")
@TimeTrackingWrapper
def calculate_test_score(self):
"""Evaluate test score of model and save locally."""
logging.info(f"start to calculate test scores")
# test scores on transformed data
test_score = self.model.evaluate(self.test_data_distributed,
use_multiprocessing=True, verbose=0)
path = self.data_store.get("model_path")
with open(os.path.join(path, "test_scores.txt"), "a") as f:
for index, item in enumerate(to_list(test_score)):
logging.info(f"{self.model.metrics_names[index]} (test), {item}")
f.write(f"{self.model.metrics_names[index]}, {item}\n")
@TimeTrackingWrapper
def train_ols_model(self):
"""Train ordinary least squared model on train data."""
if "ols" in map(lambda x: x.lower(), self.competitors):
logging.info(f"start train_ols_model on train data")
self.ols_model = OrdinaryLeastSquaredModel(self.train_data)
self.competitors = [e for e in self.competitors if e.lower() != "ols"]
else:
logging.info(f"Skip train ols model as it is not present in competitors.")
@TimeTrackingWrapper
def make_prediction(self, subset):
"""
Create predictions for NN, OLS, and persistence and add true observation as reference.
Predictions are filled in an array with full index range. Therefore, predictions can have missing values. All
predictions for a single station are stored locally under `<forecast/forecast_norm>_<station>_test.nc` and can
be found inside `forecast_path`.
"""
subset_type = subset.name
logging.info(f"start make_prediction for {subset_type}")
time_dimension = self.data_store.get("time_dim")
window_dim = self.data_store.get("window_dim")
for i, data in enumerate(subset):
input_data = data.get_X()
target_data = data.get_Y(as_numpy=False)
observation_data = data.get_observation()
# get scaling parameters
transformation_func = data.apply_transformation
nn_output = self.model.predict(input_data)
for normalised in [True, False]:
# create empty arrays
nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays(
target_data, count=4)
# nn forecast
nn_prediction = self._create_nn_forecast(copy.deepcopy(nn_output), nn_prediction, transformation_func, normalised)
# persistence
persistence_prediction = self._create_persistence_forecast(observation_data, persistence_prediction,
transformation_func, normalised)
# ols
if self.ols_model is not None:
ols_prediction = self._create_ols_forecast(input_data, ols_prediction, transformation_func,
normalised)
else:
ols_prediction = None
# observation
observation = self._create_observation(target_data, observation, transformation_func, normalised)
# merge all predictions
full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency())
prediction_dict = {self.forecast_indicator: nn_prediction,
"persi": persistence_prediction,
self.observation_indicator: observation,
"ols": ols_prediction}
all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]),
time_dimension, ahead_dim=self.ahead_dim,
index_dim=self.index_dim, type_dim=self.model_type_dim,
**prediction_dict)
# save all forecasts locally
prefix = "forecasts_norm" if normalised is True else "forecasts"
file = os.path.join(self.forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc")
all_predictions.to_netcdf(file)
def _get_frequency(self) -> str:
"""Get frequency abbreviation."""
getter = {"daily": "1D", "hourly": "1H"}
return getter.get(self._sampling, None)
def _create_competitor_forecast(self, station_name: str, competitor_name: str) -> xr.DataArray:
"""
Load and format the competing forecast of a distinct model indicated by `competitor_name` for a distinct station
indicated by `station_name`. The name of the competitor is set in the `type` axis as indicator. This method will
raise either a `FileNotFoundError` or `KeyError` if no competitor could be found for the given station. Either
there is no file provided in the expected path or no forecast for given `competitor_name` in the forecast file.
Forecast is trimmed on interval start and end of test subset.
:param station_name: name of the station to load data for
:param competitor_name: name of the model
:return: the forecast of the given competitor
"""
path = os.path.join(self.competitor_path, competitor_name)
file = os.path.join(path, f"forecasts_{station_name}_test.nc")
with xr.open_dataarray(file) as da:
data = da.load()
if self.forecast_indicator in data.coords[self.model_type_dim]:
forecast = data.sel({self.model_type_dim: [self.forecast_indicator]})
forecast.coords[self.model_type_dim] = [competitor_name]
else:
forecast = data.sel({self.model_type_dim: [competitor_name]})
# limit forecast to time range of test subset
start, end = self.data_store.get("start", "test"), self.data_store.get("end", "test")
return self.create_full_time_dim(forecast, self.index_dim, self._sampling, start, end)
def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray:
"""
Create observation as ground truth from given data.
Inverse transformation is applied to the ground truth to get the output in the original space.
:param data: observation
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform ground truth in original space if false, or use normalised predictions if true
:return: filled data array with observation
"""
if not normalised:
data = transformation_func(data, "target", inverse=True)
return data
def _create_ols_forecast(self, input_data: xr.DataArray, ols_prediction: xr.DataArray,
transformation_func: Callable, normalised: bool) -> xr.DataArray:
"""
Create ordinary least square model forecast with given input data.
Inverse transformation is applied to the forecast to get the output in the original space.
:param input_data: transposed history from DataPrep
:param ols_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with ols predictions
"""
tmp_ols = self.ols_model.predict(input_data)
target_shape = ols_prediction.values.shape
if target_shape != tmp_ols.shape:
if len(target_shape) == 2:
new_values = np.swapaxes(tmp_ols, 1, 0)
else:
new_values = np.swapaxes(tmp_ols, 2, 0)
else:
new_values = tmp_ols
ols_prediction.values = new_values
if not normalised:
ols_prediction = transformation_func(ols_prediction, "target", inverse=True)
return ols_prediction
def _create_persistence_forecast(self, data, persistence_prediction: xr.DataArray, transformation_func: Callable,
normalised: bool) -> xr.DataArray:
"""
Create persistence forecast with given data.
Persistence is deviated from the value at t=0 and applied to all following time steps (t+1, ..., t+window).
Inverse transformation is applied to the forecast to get the output in the original space.
:param data: observation
:param persistence_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with persistence predictions
"""
tmp_persi = data.copy()
persistence_prediction.values = np.tile(tmp_persi, (self.window_lead_time, 1)).T
if not normalised:
persistence_prediction = transformation_func(persistence_prediction, "target", inverse=True)
return persistence_prediction
def _create_nn_forecast(self, nn_output: xr.DataArray, nn_prediction: xr.DataArray, transformation_func: Callable,
normalised: bool) -> xr.DataArray:
"""
Create NN forecast for given input data.
Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the
output of the main branch is returned (not all minor branches, if the network has multiple output branches). The
main branch is defined to be the last entry of all outputs.
:param nn_output: Full NN model output
:param nn_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with nn predictions
"""
if isinstance(nn_output, list):
nn_prediction.values = nn_output[-1]
elif nn_output.ndim == 3:
nn_prediction.values = nn_output[-1, ...]
elif nn_output.ndim == 2:
nn_prediction.values = nn_output
else:
raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {nn_output.dims}.")
if not normalised:
nn_prediction = transformation_func(nn_prediction, base="target", inverse=True)
return nn_prediction
@staticmethod
def _create_empty_prediction_arrays(target_data, count=1):
"""
Create array to collect all predictions. Expand target data by a station dimension. """
return [target_data.copy() for _ in range(count)]
@staticmethod
def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame:
"""
Create full index from first and last date inside df and resample with given frequency.
:param df: use time range of this data set
:param freq: frequency of full index
:return: empty data frame with full index.
"""
if isinstance(df, pd.DataFrame):
earliest = df.index[0]
latest = df.index[-1]
elif isinstance(df, xr.DataArray):
earliest = df.index[0].values
latest = df.index[-1].values
elif isinstance(df, pd.DatetimeIndex):
earliest = df[0]
latest = df[-1]
else:
raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes "
f"are supported. Given type is {type(df)}.")
index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
return index
@staticmethod
def create_forecast_arrays(index: pd.DataFrame, ahead_names: List[Union[str, int]], time_dimension,
ahead_dim="ahead", index_dim="index", type_dim="type", **kwargs):
"""
Combine different forecast types into single xarray.
:param index: index for forecasts (e.g. time)
:param ahead_names: names of ahead values (e.g. hours or days)
:param kwargs: as xarrays; data of forecasts
:return: xarray of dimension 3: index, ahead_names, # predictions
"""
kwargs = {k: v for k, v in kwargs.items() if v is not None}
keys = list(kwargs.keys())
res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
coords=[index.index, ahead_names, keys], dims=[index_dim, ahead_dim, type_dim])
for k, v in kwargs.items():
intersection = set(res.index.values) & set(v.indexes[time_dimension].values)
match_index = np.array(list(intersection))
res.loc[match_index, :, k] = v.loc[match_index]
return res
def _get_internal_data(self, station: str, path: str) -> Union[xr.DataArray, None]:
"""
Get internal data for given station.
Internal data is defined as data that is already known to the model. From an evaluation perspective, this
refers to data, that is no test data, and therefore to train and val data.
:param station: name of station to load internal data.
"""
try:
file = os.path.join(path, f"forecasts_{str(station)}_train_val.nc")
with xr.open_dataarray(file) as da:
return da.load()
except (IndexError, KeyError, FileNotFoundError):
return None
def _get_external_data(self, station: str, path: str) -> Union[xr.DataArray, None]:
"""
Get external data for given station.
External data is defined as data that is not known to the model. From an evaluation perspective, this refers to
data, that is not train or val data, and therefore to test data.
:param station: name of station to load external data.
"""
try:
file = os.path.join(path, f"forecasts_{str(station)}_test.nc")
with xr.open_dataarray(file) as da:
return da.load()
except (IndexError, KeyError, FileNotFoundError):
return None
def _combine_forecasts(self, forecast, competitor, dim=None):
"""
Combine forecast and competitor if both are xarray. If competitor is None, this returns forecasts and vise
versa.
"""
if dim is None:
dim = self.model_type_dim
try:
return xr.concat([forecast, competitor], dim=dim)
except (TypeError, AttributeError):
return forecast if competitor is None else competitor
def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict, Dict]:
"""
Calculate error metrics and skill scores of NN forecast.
The competitive skill score compares the NN prediction with persistence and ordinary least squares forecasts.
Whereas, the climatological skill scores evaluates the NN prediction in terms of meaningfulness in comparison
to different climatological references.
:return: competitive and climatological skill scores, error metrics
"""
all_stations = self.data_store.get("stations")
skill_score_competitive = {}
skill_score_competitive_count = {}
skill_score_climatological = {}
errors = {}
for station in all_stations:
external_data = self._get_external_data(station, self.forecast_path) # test data
# test errors
if external_data is not None:
external_data.coords[self.model_type_dim] = [{self.forecast_indicator: self.model_display_name}.get(n, n)
for n in external_data.coords[self.model_type_dim].values]
model_type_list = external_data.coords[self.model_type_dim].values.tolist()
for model_type in remove_items(model_type_list, self.observation_indicator):
if model_type not in errors.keys():
errors[model_type] = {}
errors[model_type][station] = statistics.calculate_error_metrics(
*map(lambda x: external_data.sel(**{self.model_type_dim: x}),
[model_type, self.observation_indicator]), dim=self.index_dim)
# load competitors
competitor = self.load_competitors(station)
combined = self._combine_forecasts(external_data, competitor, dim=self.model_type_dim)
if combined is not None:
model_list = remove_items(combined.coords[self.model_type_dim].values.tolist(),
self.observation_indicator)
else:
model_list = None
# test errors of competitors
for model_type in (model_list or []):
if self.observation_indicator not in combined.coords[self.model_type_dim]:
continue
if model_type not in errors.keys():
errors[model_type] = {}
errors[model_type][station] = statistics.calculate_error_metrics(
*map(lambda x: combined.sel(**{self.model_type_dim: x}),
[model_type, self.observation_indicator]), dim=self.index_dim)
# skill score
skill_score = statistics.SkillScores(combined, models=model_list, ahead_dim=self.ahead_dim,
type_dim=self.model_type_dim, index_dim=self.index_dim)
if external_data is not None:
skill_score_competitive[station], skill_score_competitive_count[station] = skill_score.skill_scores()
internal_data = self._get_internal_data(station, self.forecast_path)
if internal_data is not None:
skill_score_climatological[station] = skill_score.climatological_skill_scores(
internal_data, forecast_name=self.forecast_indicator)
for model_type in errors.keys():
errors[model_type].update({"total": self.calculate_average_errors(errors[model_type])})
skill_score_competitive.update({"total": self.calculate_average_skill_scores(skill_score_competitive,
skill_score_competitive_count)})
return skill_score_competitive, skill_score_competitive_count, skill_score_climatological, errors
@staticmethod
def calculate_average_skill_scores(scores, counts):
avg_skill_score = 0
n_total = None
for vals in counts.values():
n_total = vals if n_total is None else n_total.add(vals, fill_value=0)
for station, station_scores in scores.items():
n = counts.get(station)
avg_skill_score = station_scores.mul(n.div(n_total, fill_value=0), fill_value=0).add(avg_skill_score,
fill_value=0)
return avg_skill_score
@staticmethod
def calculate_average_errors(errors):
avg_error = {}
n_total = sum([x.get("n", 0) for _, x in errors.items()])
for station, station_errors in errors.items():
n_station = station_errors.get("n")
for error_metric, val in station_errors.items():
new_val = avg_error.get(error_metric, 0) + val * n_station / n_total
avg_error[error_metric] = new_val
return avg_error
def report_feature_importance_results(self, results):
"""Create a csv file containing all results from feature importance."""
report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report")
path_config.check_path_and_create(report_path)
res = []
max_cols = 0
for boot_type, d0 in results.items():
for boot_method, d1 in d0.items():
for station_name, vals in d1.items():
for boot_var in vals.coords[self.boot_var_dim].values.tolist():
for ahead in vals.coords[self.ahead_dim].values.tolist():
res.append([boot_type, boot_method, station_name, boot_var, ahead,
*vals.sel({self.boot_var_dim: boot_var,
self.ahead_dim: ahead}).values.round(5).tolist()])
max_cols = max(max_cols, len(res[-1]))
col_names = [self.model_type_dim, "method", "station", self.boot_var_dim, self.ahead_dim,
*list(range(max_cols - 5))]
df = pd.DataFrame(res, columns=col_names)
file_name = "feature_importance_skill_score_report_raw.csv"
df.to_csv(os.path.join(report_path, file_name), sep=";")
def report_error_metrics(self, errors):
report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report")
path_config.check_path_and_create(report_path)
for model_type in errors.keys():
metric_collection = {}
for station, station_errors in errors[model_type].items():
if isinstance(station_errors, xr.DataArray):
dim = station_errors.dims[0]
sel_index = [sel for sel in station_errors.coords[dim] if "CASE" in str(sel)]
station_errors = {str(i.values): station_errors.sel(**{dim: i}) for i in sel_index}
elif isinstance(station_errors, pd.DataFrame):
sel_index = station_errors.index.tolist()
ahead = station_errors.columns.values
station_errors = {k: xr.DataArray(station_errors[station_errors.index == k].values.flatten(),
dims=["ahead"], coords={"ahead": ahead}).astype(float)
for k in sel_index}
for metric, vals in station_errors.items():
if metric == "n":
metric = "count"
pd_vals = pd.DataFrame.from_dict({station: vals}).T
pd_vals.columns = [f"{metric}(t+{x})" for x in vals.coords["ahead"].values]
mc = metric_collection.get(metric, pd.DataFrame())
mc = mc.append(pd_vals)
metric_collection[metric] = mc
for metric, error_df in metric_collection.items():
df = error_df.sort_index()
if "total" in df.index:
df.reindex(df.index.drop(["total"]).to_list() + ["total"], )
column_format = tables.create_column_format_for_tex(df)
if model_type == "skill_score":
file_name = f"error_report_{model_type}_{metric}.%s".replace(' ', '_').replace('/', '_')
else:
file_name = f"error_report_{metric}_{model_type}.%s".replace(' ', '_').replace('/', '_')
tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df)
tables.save_to_md(report_path, file_name % "md", df=df)