Select Git revision
nvidia_driver.py
post_processing.py 18.30 KiB
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-11'
import logging
import os
import numpy as np
import pandas as pd
import xarray as xr
import statsmodels.api as sm
import keras
from scipy import stats
from src.run_modules.run_environment import RunEnvironment
from src.data_handling.data_distributor import Distributor
from src.data_handling.data_generator import DataGenerator
from src.model_modules.linear_model import OrdinaryLeastSquaredModel
from src import statistics
from src import helpers
from src.helpers import TimeTracking
from src.plotting.postprocessing_plotting import plot_monthly_summary, plot_climsum_boxplot, plot_station_map, plot_conditional_quantiles
from src.datastore import NameNotFoundInDataStore
class PostProcessing(RunEnvironment):
def __init__(self):
super().__init__()
self.model: keras.Model = self._load_model()
self.ols_model = None
self.batch_size: int = self.data_store.get("batch_size", "general.model")
self.test_data: DataGenerator = self.data_store.get("generator", "general.test")
self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size)
self.train_data: DataGenerator = self.data_store.get("generator", "general.train")
self.train_val_data: DataGenerator = self.data_store.get("generator", "general.train_val")
self.plot_path: str = self.data_store.get("plot_path", "general")
self.skill_scores = None
# self._run()
def _run(self):
self.train_ols_model()
preds_for_all_stations = self.make_prediction()
self.skill_scores = self.calculate_skill_scores() #TODO: stopped here, continue with plot routine. Think about if skill score should be saved locally and loaded for plotting or if self.skill_scores should be used
self.plot()
def _load_model(self):
try:
model = self.data_store.get("best_model", "general")
except NameNotFoundInDataStore:
logging.info("no model saved in data store. trying to load model from experiment")
path = self.data_store.get("experiment_path", "general")
name = f"{self.data_store.get('experiment_name', 'general')}_my_model.h5"
model_name = os.path.join(path, name)
model = keras.models.load_model(model_name)
return model
def plot(self):
logging.debug("Run plotting routines...")
path = self.data_store.get("forecast_path", "general")
window_lead_time = self.data_store.get("window_lead_time", "general")
target_var = self.data_store.get("target_var", "general")
plot_conditional_quantiles(self.test_data.stations, plot_folder=self.plot_path, pred_name="CNN", ref_name="orig",
forecast_path=self.data_store.get("forecast_path", "general"), plot_name_affix="cali-ref")
plot_conditional_quantiles(self.test_data.stations, plot_folder=self.plot_path, pred_name="orig", ref_name="CNN",
forecast_path=self.data_store.get("forecast_path", "general"), plot_name_affix="like-bas")
plot_station_map(generators={'b': self.test_data}, plot_folder=self.plot_path)
plot_monthly_summary(self.test_data.stations, path, r"forecasts_%s_test.nc", window_lead_time, target_var,
plot_folder=self.plot_path)
# plot_climsum_boxplot()
# FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=True, **kwargs)
# FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=False, extra_nametag='_all_terms', **kwargs)
# FKf.ss_sum_boxplot(ss_dic, plot_path, modelsetup)
def calculate_test_score(self):
test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(),
use_multiprocessing=False, verbose=0, steps=1)
logging.info(f"test score = {test_score}")
self._save_test_score(test_score)
def _save_test_score(self, score):
path = self.data_store.get("experiment_path", "general")
with open(os.path.join(path, "test_scores.txt")) as f:
for index, item in enumerate(score):
f.write(f"{self.model.metrics[index]}, {item}\n")
def train_ols_model(self):
self.ols_model = OrdinaryLeastSquaredModel(self.train_data)
def make_prediction(self, freq="1D"):
logging.debug("start make_prediction")
nn_prediction_all_stations = []
for i, v in enumerate(self.test_data):
data = self.test_data.get_data_generator(i)
nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3)
input_data = self.test_data[i][0]
# get scaling parameters
mean, std, transformation_method = data.get_transformation_information(variable='o3')
# nn forecast
nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method)
# persistence
persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std,
transformation_method)
# ols
ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method)
# orig pred
orig_pred = self._create_orig_forecast(data, None, mean, std, transformation_method)
# merge all predictions
full_index = self.create_fullindex(data.data.indexes['datetime'], freq)
all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']),
CNN=nn_prediction,
persi=persistence_prediction,
orig=orig_pred,
OLS=ols_prediction)
# save all forecasts locally
path = self.data_store.get("forecast_path", "general")
file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc")
all_predictions.to_netcdf(file)
# save nn forecast to return variable
nn_prediction_all_stations.append(nn_prediction)
return nn_prediction_all_stations
@staticmethod
def _create_orig_forecast(data, _, mean, std, transformation_method):
return statistics.apply_inverse_transformation(data.label, mean, std, transformation_method)
def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method):
tmp_ols = self.ols_model.predict(input_data)
tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method)
ols_prediction.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0)
return ols_prediction
def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method):
tmp_persi = input_data.sel({'window': 0, 'variables': 'o3'})
tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method)
window_lead_time = self.data_store.get("window_lead_time", "general")
persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)),
axis=1)
return persistence_prediction
def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method):
tmp_nn = self.model.predict(input_data)
tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method)
nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn, axis=1), 2, 0)
return nn_prediction
@staticmethod
def _create_empty_prediction_arrays(generator, count=1):
return [generator.label.copy()] * count
@staticmethod
def create_fullindex(df, freq):
# Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df
# param: df as pandas dataframe
# param: freq as string
# return: index as pandas dataframe
if isinstance(df, pd.DataFrame):
earliest = df.index[0]
latest = df.index[-1]
elif isinstance(df, xr.DataArray):
earliest = df.index[0].values
latest = df.index[-1].values
elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex):
earliest = df[0]
latest = df[-1]
else:
raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes "
f"are supported. Given type is {type(df)}.")
index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
return index
@staticmethod
def create_forecast_arrays(index, ahead_names, **kwargs):
"""
This function combines different forecast types into one xarray.
:param index: as index; index for forecasts (e.g. time)
:param ahead_names: as list of str/int: names of ahead values (e.g. hours or days)
:param kwargs: as xarrays; data of forecasts
:return: xarray of dimension 3: index, ahead_names, # predictions
"""
keys = list(kwargs.keys())
res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type'])
for k, v in kwargs.items():
try:
match_index = np.stack(set(res.index.values) & set(v.index.values))
res.loc[match_index, :, k] = v.loc[match_index]
except AttributeError: # v is xarray type and has no attribute .index
match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
return res
def _get_external_data(self, station):
try:
data = self.train_val_data.get_data_generator(station)
mean, std, transformation_method = data.get_transformation_information(variable='o3')
external_data = self._create_orig_forecast(data, None, mean, std, transformation_method)
external_data = external_data.squeeze("Stations").sel(window=1).drop(["window", "Stations", "variables"])
return external_data.rename({'datetime': 'index'})
except KeyError:
return None
def calculate_skill_scores(self, threshold=60):
path = self.data_store.get("forecast_path", "general")
window_lead_time = self.data_store.get("window_lead_time", "general")
skill_score_general = {}
skill_score_climatological = {}
for station in self.test_data.stations: # TODO: replace this by a more general approach to also calculate on train/val
file = os.path.join(path, f"forecasts_{station}_test.nc")
data = xr.open_dataarray(file)
ss = SkillScores(data)
external_data = self._get_external_data(station)
skill_score_general[station] = ss.skill_scores(window_lead_time)
skill_score_climatological[station] = ss.climatological_skill_scores(external_data, window_lead_time)
return skill_score_general, skill_score_climatological
class SkillScores(RunEnvironment):
def __init__(self, internal_data):
super().__init__()
self.internal_data = internal_data
def skill_scores(self, window_lead_time):
ahead_names = list(range(1, window_lead_time + 1))
skill_score = pd.DataFrame(index=['cnn-persi', 'ols-persi', 'cnn-ols'])
for iahead in ahead_names:
data = self.internal_data.sel(ahead=iahead)
skill_score[iahead] = [self.general_skill_score(data),
self.general_skill_score(data, forecast_name="OLS"),
self.general_skill_score(data, reference_name="OLS")]
return skill_score
def climatological_skill_scores(self, external_data, window_lead_time):
ahead_names = list(range(1, window_lead_time + 1))
all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III',
'CASE IV']
skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms],
dims=['terms', 'ahead'])
for iahead in ahead_names:
data = self.internal_data.sel(ahead=iahead)
skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score(
data, mu_type=1, forecast_name="CNN").values.flatten())
skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score(
data, mu_type=2, forecast_name="CNN").values.flatten())
if external_data is not None:
skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score(
data, mu_type=3, forecast_name="CNN",
external_data=external_data).values.flatten())
skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score(
data, mu_type=4, forecast_name="CNN",
external_data=external_data).values.flatten())
return skill_score
def _climatological_skill_score(self, data, mu_type=1, observation_name="orig", forecast_name="CNN", external_data=None):
kwargs = {"external_data": external_data} if external_data is not None else {}
return self.__getattribute__(f"skill_score_mu_case_{mu_type}")(data, observation_name, forecast_name, **kwargs)
@staticmethod
def general_skill_score(data, observation_name="orig", forecast_name="CNN", reference_name="persi"):
data = data.dropna("index")
observation = data.loc[..., observation_name]
forecast = data.loc[..., forecast_name]
reference = data.loc[..., reference_name]
mse = statistics.mean_squared_error
skill_score = 1 - mse(observation, forecast) / mse(observation, reference)
return skill_score
@staticmethod
def skill_score_pre_calculations(data, observation_name, forecast_name):
data = data.loc[..., [observation_name, forecast_name]].drop("ahead")
data = data.dropna("index")
mean = data.mean("index")
var = data.var("index")
r, p = stats.spearmanr(data.loc[..., [forecast_name, observation_name]])
AI = np.array(r ** 2)
BI = ((r - var.loc[..., forecast_name] / var.loc[..., observation_name]) ** 2).values
CI = (((mean.loc[..., forecast_name] - mean.loc[..., observation_name]) / var.loc[
..., observation_name]) ** 2).values
suffix = {"mean": mean, "var": var, "r": r, "p": p}
return AI, BI, CI, data, suffix
def skill_score_mu_case_1(self, data, observation_name="orig", forecast_name="CNN"):
AI, BI, CI, data, _ = self.skill_score_pre_calculations(data, observation_name, forecast_name)
skill_score = np.array(AI - BI - CI)
return pd.DataFrame({"skill_score": [skill_score], "AI": [AI], "BI": [BI], "CI": [CI]}).to_xarray().to_array()
def skill_score_mu_case_2(self, data, observation_name="orig", forecast_name="CNN"):
AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
monthly_mean = self.create_monthly_mean_from_daily_data(data)
data = xr.concat([data, monthly_mean], dim="type")
var = data.var("index")
r, p = stats.spearmanr(data.loc[..., [observation_name, observation_name + "X"]])
AII = np.array(r ** 2)
BII = ((r - var.loc[observation_name + 'X'] / var.loc[observation_name]) ** 2).values
skill_score = np.array((AI - BI - CI - AII + BII) / (1 - AII + BII))
return pd.DataFrame({"skill_score": [skill_score], "AII": [AII], "BII": [BII]}).to_xarray().to_array()
def skill_score_mu_case_3(self, data, observation_name="orig", forecast_name="CNN", external_data=None):
AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
mean, var = suffix["mean"], suffix["var"]
AIII = (((external_data.mean().values - mean.loc[observation_name]) / var.loc[observation_name])**2).values
skill_score = np.array((AI - BI - CI + AIII) / 1 + AIII)
return pd.DataFrame({"skill_score": [skill_score], "AIII": [AIII]}).to_xarray().to_array()
def skill_score_mu_case_4(self, data, observation_name="orig", forecast_name="CNN", external_data=None):
AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values)
data = xr.concat([data, monthly_mean_external], dim="type")
mean = data.mean("index")
var = data.var("index")
r_mu, p_mu = stats.spearmanr(data.loc[..., [observation_name, observation_name+'X']])
AIV = np.array(r_mu**2)
BIV = ((r_mu - var.loc[observation_name + 'X'] / var.loc[observation_name])**2).values
CIV = (((mean.loc[observation_name + 'X'] - mean.loc[observation_name]) / var.loc[observation_name])**2).values
skill_score = np.array((AI - BI - CI - AIV + BIV + CIV) / (1 - AIV + BIV + CIV))
return pd.DataFrame({"skill_score": [skill_score], "AIV": [AIV], "BIV": [BIV], "CIV": CIV}).to_xarray().to_array()
@staticmethod
def create_monthly_mean_from_daily_data(data, columns=None):
if columns is None:
columns = data.type.values
coordinates = [data.index, [v + "X" for v in list(columns)]] # TODO
empty_data = np.full((len(data.index), len(columns)), np.nan)
monthly_mean = xr.DataArray(empty_data, coords=coordinates, dims=["index", "type"])
mu = data.groupby("index.month").mean()
for month in mu.month:
monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values
return monthly_mean