Skip to content
Snippets Groups Projects

Resolve "release v1.4.0"

Merged Ghost User requested to merge release_v1.4.0 into master
1 file
+ 54
9
Compare changes
  • Side-by-side
  • Inline
@@ -13,9 +13,10 @@ import numpy as np
import pandas as pd
import xarray as xr
from mlair.configuration import path_config
from mlair.data_handler import BootStraps, KerasIterator
from mlair.helpers.datastore import NameNotFoundInDataStore
from mlair.helpers import TimeTracking, statistics, extract_value, remove_items, to_list
from mlair.helpers import TimeTracking, statistics, extract_value, remove_items, to_list, tables
from mlair.model_modules.linear_model import OrdinaryLeastSquaredModel
from mlair.model_modules import AbstractModelClass
from mlair.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStationMap, PlotClimatologicalSkillScore, \
@@ -102,9 +103,11 @@ class PostProcessing(RunEnvironment):
create_new_bootstraps = self.data_store.get("create_new_bootstraps", "postprocessing")
self.bootstrap_postprocessing(create_new_bootstraps)
# skill scores
# skill scores and error metrics
with TimeTracking(name="calculate skill scores"):
self.skill_scores = self.calculate_skill_scores()
skill_score_competitive, skill_score_climatological, errors = self.calculate_error_metrics()
self.skill_scores = (skill_score_competitive, skill_score_climatological)
self.report_error_metrics(errors)
# plotting
self.plot()
@@ -386,8 +389,10 @@ class PostProcessing(RunEnvironment):
def calculate_test_score(self):
"""Evaluate test score of model and save locally."""
# test scores on transformed data
test_score = self.model.evaluate_generator(generator=self.test_data_distributed,
use_multiprocessing=True, verbose=0, steps=1)
use_multiprocessing=True, verbose=0)
path = self.data_store.get("model_path")
with open(os.path.join(path, "test_scores.txt"), "a") as f:
for index, item in enumerate(to_list(test_score)):
@@ -656,22 +661,29 @@ class PostProcessing(RunEnvironment):
except (TypeError, AttributeError):
return forecast if competitor is None else competitor
def calculate_skill_scores(self) -> Tuple[Dict, Dict]:
def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict]:
"""
Calculate skill scores of NN forecast.
Calculate error metrics and skill scores of NN forecast.
The competitive skill score compares the NN prediction with persistence and ordinary least squares forecasts.
Whereas, the climatological skill scores evaluates the NN prediction in terms of meaningfulness in comparison
to different climatological references.
:return: competitive and climatological skill scores
:return: competitive and climatological skill scores, error metrics
"""
path = self.data_store.get("forecast_path")
all_stations = self.data_store.get("stations")
skill_score_competitive = {}
skill_score_climatological = {}
errors = {}
for station in all_stations:
external_data = self._get_external_data(station, path)
external_data = self._get_external_data(station, path) # test data
# test errors
errors[station] = statistics.calculate_error_metrics(*map(lambda x: external_data.sel(type=x),
[self.forecast_indicator, "obs"]),
dim="index")
# skill score
competitor = self.load_competitors(station)
combined = self._combine_forecasts(external_data, competitor, dim="type")
model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None
@@ -683,4 +695,37 @@ class PostProcessing(RunEnvironment):
if internal_data is not None:
skill_score_climatological[station] = skill_score.climatological_skill_scores(
internal_data, self.window_lead_time, forecast_name=self.forecast_indicator)
return skill_score_competitive, skill_score_climatological
errors.update({"total": self.calculate_average_errors(errors)})
return skill_score_competitive, skill_score_climatological, errors
@staticmethod
def calculate_average_errors(errors):
avg_error = {}
n_total = sum([x.get("n", 0) for _, x in errors.items()])
for station, station_errors in errors.items():
n_station = station_errors.get("n")
for error_metric, val in station_errors.items():
new_val = avg_error.get(error_metric, 0) + val * n_station / n_total
avg_error[error_metric] = new_val
return avg_error
def report_error_metrics(self, errors):
report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report")
path_config.check_path_and_create(report_path)
metric_collection = {}
for station, station_errors in errors.items():
for metric, vals in station_errors.items():
if metric == "n":
continue
pd_vals = pd.DataFrame.from_dict({station: vals}).T
pd_vals.columns = [f"{metric}(t+{x})" for x in vals.coords["ahead"].values]
mc = metric_collection.get(metric, pd.DataFrame())
mc = mc.append(pd_vals)
metric_collection[metric] = mc
for metric, error_df in metric_collection.items():
df = error_df.sort_index()
df.reindex(df.index.drop(["total"]).to_list() + ["total"], )
column_format = tables.create_column_format_for_tex(df)
tables.save_to_tex(report_path, f"error_report_{metric}.tex", column_format=column_format, df=df)
tables.save_to_md(report_path, f"error_report_{metric}.md", df=df)
Loading