__author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-11' import logging import os import keras import numpy as np import pandas as pd import xarray as xr from src import statistics from src.data_handling.data_distributor import Distributor from src.data_handling.data_generator import DataGenerator from src.data_handling.bootstraps import BootStraps from src.datastore import NameNotFoundInDataStore from src.helpers import TimeTracking from src.model_modules.linear_model import OrdinaryLeastSquaredModel from src.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStationMap, PlotClimatologicalSkillScore, \ PlotCompetitiveSkillScore, PlotTimeSeries, PlotBootstrapSkillScore from src.plotting.postprocessing_plotting import plot_conditional_quantiles from src.run_modules.run_environment import RunEnvironment class PostProcessing(RunEnvironment): def __init__(self): super().__init__() self.model: keras.Model = self._load_model() self.ols_model = None self.batch_size: int = self.data_store.get_default("batch_size", "general.model", 64) self.test_data: DataGenerator = self.data_store.get("generator", "general.test") self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size) self.train_data: DataGenerator = self.data_store.get("generator", "general.train") self.train_val_data: DataGenerator = self.data_store.get("generator", "general.train_val") self.plot_path: str = self.data_store.get("plot_path", "general") self.target_var = self.data_store.get("target_var", "general") self._sampling = self.data_store.get("sampling", "general") self.skill_scores = None self.bootstrap_skill_scores = None self._run() def _run(self): with TimeTracking(): self.train_ols_model() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip train_ols_model() whenever it is possible to save time.") with TimeTracking(): self.make_prediction() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip make_prediction() whenever it is possible to save time.") self.bootstrap_skill_scores = self.create_boot_straps() self.skill_scores = self.calculate_skill_scores() self.plot() def create_boot_straps(self): # forecast bootstrap_path = self.data_store.get("bootstrap_path", "general") forecast_path = self.data_store.get("forecast_path", "general") window_lead_time = self.data_store.get("window_lead_time", "general") bootstraps = BootStraps(self.test_data, bootstrap_path, 20) with TimeTracking(name="boot predictions"): bootstrap_predictions = self.model.predict_generator(generator=bootstraps.boot_strap_generator(), steps=bootstraps.get_boot_strap_generator_length()) bootstrap_meta = np.array(bootstraps.get_boot_strap_meta()) variables = np.unique(bootstrap_meta[:, 0]) for station in np.unique(bootstrap_meta[:, 1]): coords = None for boot in variables: ind = np.all(bootstrap_meta == [boot, station], axis=1) length = sum(ind) sel = bootstrap_predictions[ind].reshape((length, window_lead_time, 1)) coords = (range(length), range(1, window_lead_time + 1)) tmp = xr.DataArray(sel, coords=(*coords, [boot]), dims=["index", "ahead", "type"]) file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") tmp.to_netcdf(file_name) labels = bootstraps.get_labels(station).reshape((length, window_lead_time, 1)) file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "ahead", "type"]) labels.to_netcdf(file_name) # file_name = os.path.join(forecast_path, f"bootstraps_orig.nc") # orig = xr.open_dataarray(file_name) # calc skill scores skill_scores = statistics.SkillScores(None) score = {} for station in np.unique(bootstrap_meta[:, 1]): file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") labels = xr.open_dataarray(file_name) shape = labels.shape orig = bootstraps.get_orig_prediction(forecast_path, f"forecasts_norm_{station}_test.nc").reshape(shape) orig = xr.DataArray(orig, coords=(range(shape[0]), range(1, shape[1] + 1), ["orig"]), dims=["index", "ahead", "type"]) skill = pd.DataFrame(columns=range(1, window_lead_time + 1)) for boot in variables: file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") boot_data = xr.open_dataarray(file_name) boot_data = boot_data.combine_first(labels) boot_data = boot_data.combine_first(orig) boot_scores = [] for iahead in range(window_lead_time): data = boot_data.sel(ahead=iahead + 1) boot_scores.append(skill_scores.general_skill_score(data, forecast_name=boot, reference_name="orig")) skill.loc[boot] = np.array(boot_scores) score[station] = xr.DataArray(skill, dims=["boot_var", "ahead"]) return score def _load_model(self): try: model = self.data_store.get("best_model", "general") except NameNotFoundInDataStore: logging.info("no model saved in data store. trying to load model from experiment path") model_name = self.data_store.get("model_name", "general.model") model = keras.models.load_model(model_name) return model def plot(self): logging.debug("Run plotting routines...") path = self.data_store.get("forecast_path", "general") plot_conditional_quantiles(self.test_data.stations, pred_name="CNN", ref_name="obs", forecast_path=path, plot_name_affix="cali-ref", plot_folder=self.plot_path) plot_conditional_quantiles(self.test_data.stations, pred_name="obs", ref_name="CNN", forecast_path=path, plot_name_affix="like-bas", plot_folder=self.plot_path) PlotStationMap(generators={'b': self.test_data}, plot_folder=self.plot_path) PlotMonthlySummary(self.test_data.stations, path, r"forecasts_%s_test.nc", self.target_var, plot_folder=self.plot_path) PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, model_setup="CNN") PlotClimatologicalSkillScore(self.skill_scores[1], plot_folder=self.plot_path, score_only=False, extra_name_tag="all_terms_", model_setup="CNN") PlotCompetitiveSkillScore(self.skill_scores[0], plot_folder=self.plot_path, model_setup="CNN") PlotBootstrapSkillScore(self.bootstrap_skill_scores, plot_folder=self.plot_path, model_setup="CNN") PlotTimeSeries(self.test_data.stations, path, r"forecasts_%s_test.nc", plot_folder=self.plot_path, sampling=self._sampling) def calculate_test_score(self): test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(), use_multiprocessing=False, verbose=0, steps=1) logging.info(f"test score = {test_score}") self._save_test_score(test_score) def _save_test_score(self, score): path = self.data_store.get("experiment_path", "general") with open(os.path.join(path, "test_scores.txt")) as f: for index, item in enumerate(score): f.write(f"{self.model.metrics[index]}, {item}\n") def train_ols_model(self): self.ols_model = OrdinaryLeastSquaredModel(self.train_data) def make_prediction(self): logging.debug("start make_prediction") for i, _ in enumerate(self.test_data): data = self.test_data.get_data_generator(i) input_data = data.get_transposed_history() # get scaling parameters mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) for normalised in [True, False]: # create empty arrays nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3) # nn forecast nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method, normalised) # persistence persistence_prediction = self._create_persistence_forecast(data, persistence_prediction, mean, std, transformation_method, normalised) # ols ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method, normalised) # observation observation = self._create_observation(data, None, mean, std, transformation_method, normalised) # merge all predictions full_index = self.create_fullindex(data.data.indexes['datetime'], self._get_frequency()) all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']), CNN=nn_prediction, persi=persistence_prediction, obs=observation, OLS=ols_prediction) # save all forecasts locally path = self.data_store.get("forecast_path", "general") prefix = "forecasts_norm" if normalised else "forecasts" file = os.path.join(path, f"{prefix}_{data.station[0]}_test.nc") all_predictions.to_netcdf(file) def _get_frequency(self): getter = {"daily": "1D", "hourly": "1H"} return getter.get(self._sampling, None) @staticmethod def _create_observation(data, _, mean, std, transformation_method, normalised): obs = data.observation.copy() if not normalised: obs = statistics.apply_inverse_transformation(obs, mean, std, transformation_method) return obs def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method, normalised): tmp_ols = self.ols_model.predict(input_data) if not normalised: tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) tmp_ols = np.expand_dims(tmp_ols, axis=1) target_shape = ols_prediction.values.shape ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols return ols_prediction def _create_persistence_forecast(self, data, persistence_prediction, mean, std, transformation_method, normalised): tmp_persi = data.observation.copy().sel({'window': 0}) if not normalised: tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) window_lead_time = self.data_store.get("window_lead_time", "general") persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)), axis=1) return persistence_prediction def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method, normalised): """ create the nn forecast for given input data. Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the output of the main branch is returned (not all minor branches, if the network has multiple output branches). The main branch is defined to be the last entry of all outputs. :param input_data: :param nn_prediction: :param mean: :param std: :param transformation_method: :return: """ tmp_nn = self.model.predict(input_data) if not normalised: tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method) if tmp_nn.ndim == 3: nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn[-1, ...], axis=1), 2, 0) elif tmp_nn.ndim == 2: nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn, axis=1), 2, 0) else: raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {tmp_nn.dims}.") return nn_prediction @staticmethod def _create_empty_prediction_arrays(generator, count=1): return [generator.label.copy() for _ in range(count)] @staticmethod def create_fullindex(df, freq): # Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df # param: df as pandas dataframe # param: freq as string # return: index as pandas dataframe if isinstance(df, pd.DataFrame): earliest = df.index[0] latest = df.index[-1] elif isinstance(df, xr.DataArray): earliest = df.index[0].values latest = df.index[-1].values elif isinstance(df, pd.DatetimeIndex): earliest = df[0] latest = df[-1] else: raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes " f"are supported. Given type is {type(df)}.") index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq)) return index @staticmethod def create_forecast_arrays(index, ahead_names, **kwargs): """ This function combines different forecast types into one xarray. :param index: as index; index for forecasts (e.g. time) :param ahead_names: as list of str/int: names of ahead values (e.g. hours or days) :param kwargs: as xarrays; data of forecasts :return: xarray of dimension 3: index, ahead_names, # predictions """ keys = list(kwargs.keys()) res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan), coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type']) for k, v in kwargs.items(): try: match_index = np.stack(set(res.index.values) & set(v.index.values)) res.loc[match_index, :, k] = v.loc[match_index] except AttributeError: # v is xarray type and has no attribute .index match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() return res def _get_external_data(self, station): try: data = self.train_val_data.get_data_generator(station) mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) external_data = self._create_observation(data, None, mean, std, transformation_method, normalised=False) external_data = external_data.squeeze("Stations").sel(window=1).drop(["window", "Stations", "variables"]) return external_data.rename({'datetime': 'index'}) except KeyError: return None def calculate_skill_scores(self): path = self.data_store.get("forecast_path", "general") window_lead_time = self.data_store.get("window_lead_time", "general") skill_score_competitive = {} skill_score_climatological = {} for station in self.test_data.stations: file = os.path.join(path, f"forecasts_{station}_test.nc") data = xr.open_dataarray(file) skill_score = statistics.SkillScores(data) external_data = self._get_external_data(station) skill_score_competitive[station] = skill_score.skill_scores(window_lead_time) skill_score_climatological[station] = skill_score.climatological_skill_scores(external_data, window_lead_time) return skill_score_competitive, skill_score_climatological