From 4b9c51c0a5f2ed3227a92e3edff81a921c4396ed Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Thu, 12 Dec 2019 14:41:30 +0100 Subject: [PATCH] worked on post processing. not ready right now --- run.py | 2 +- src/run_modules/post_processing.py | 127 ++++++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/run.py b/run.py index 18189f95..03eda042 100644 --- a/run.py +++ b/run.py @@ -10,7 +10,7 @@ from src.run_modules.run_environment import RunEnvironment from src.run_modules.pre_processing import PreProcessing from src.run_modules.model_setup import ModelSetup from src.run_modules.training import Training -from src.run_modules.modules import PostProcessing +from src.run_modules.post_processing import PostProcessing def main(parser_args): diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index c76ef1cc..cd6a8d38 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -2,14 +2,139 @@ __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-11' +import logging +import os + +import numpy as np +import pandas as pd +import xarray as xr +import statsmodels.api as sm + from src.run_modules.run_environment import RunEnvironment +from src.data_handling.data_distributor import Distributor +from src.model_modules.linear_model import OrdinaryLeastSquaredModel +from src import statistics class PostProcessing(RunEnvironment): def __init__(self): super().__init__() + self.model = self.data_store.get("best_model", "general") + self.batch_size = self.data_store.get("batch_size", "general.model") + self.test_data = Distributor(self.data_store.get("generator", "general.test"), self.model, self.batch_size) + self.train_data = self.data_store.get("generator", "general.train") self._run() def _run(self): - pass + preds_for_all_stations = self.make_prediction_2() + + def calculate_test_score(self): + test_score = self.model.evaluate(generator=self.test_data.distribute_on_batches(), use_multiprocessing=False, + verbose=0, steps=1) + logging.info(f"test score = {test_score}") + self._save_test_score(test_score) + + def _save_test_score(self, score): + path = self.data_store.get("experiment_path", "general") + with open(os.path.join(path, "test_scores.txt")) as f: + for index, item in enumerate(score): + f.write(f"{self.model.metrics[index]}, {item}\n") + + def make_prediction(self): + self.model.predict_generator(generator=self.test_data.distribute_on_batches(), steps=1) + + def train_ols_model(self): + return OrdinaryLeastSquaredModel(self.train_data) + + def make_prediction_2(self, freq="1D"): + preds_for_all_stations = [] + ols_model = self.train_ols_model() + failed_stations = [] + for i, v in enumerate(self.train_data): + data = self.train_data.get_data_generator(i) + keras_pred = data.label.copy() + persi_pred = data.label.copy() + ols_pred = data.label.copy() + pred_input = self.train_data[i][0] + + # nn forecast + nn_prediction = self.model.predict(pred_input) + mean, std, transformation_method = data.get_transformation_information(variable='o3') + tmp_keras = statistics.apply_inverse_transformation(nn_prediction, mean, std) + + # persistence + tmp_persistence = statistics.apply_inverse_transformation(pred_input.sel({'window': 0, 'variables': 'o3'}), mean, std) + + # ols + tmp_ols = statistics.apply_inverse_transformation(ols_model.predict(pred_input), mean, std) + + # orig pred + orig_pred = statistics.apply_inverse_transformation(data.label, mean, std) + + keras_pred.values = np.swapaxes(np.expand_dims(tmp_keras, axis=1), 2, 0) + ols_pred.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0) + persi_pred.values = np.expand_dims(np.tile(tmp_persistence.squeeze('Stations'), (self.data_store.get("window_lead_time", "general"), 1)), axis=1) + + full_index = self.create_fullindex(data.data.indexes['datetime'], freq) + all_preds = self.create_forecast_arrays(full_index, + list(data.label.indexes['window']), + CNN=keras_pred, + persi=persi_pred, + orig=orig_pred, + OLS=ols_pred) + + preds_for_all_stations.append(keras_pred) + + return preds_for_all_stations + + @staticmethod + def create_fullindex(df, freq): + # Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df + # param: df as pandas dataframe + # param: freq as string + # return: index as pandas dataframe + if isinstance(df, pd.DataFrame): + earliest = df.index[0] + latest = df.index[-1] + elif isinstance(df, xr.DataArray): + earliest = df.index[0].values + latest = df.index[-1].values + elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex): + earliest = df[0] + latest = df[-1] + index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq)) + # + return index + + @staticmethod + def create_forecast_arrays(index, ahead_names, **kwargs): + ''' + This function combines different forecast types into one xarray. + + :param index: as index; index for forecasts (e.g. time) + :param ahead_names: as list of str/int: names of ahead values (e.g. hours or days) + :param kwargs: as xarrays; data of forecasts + :return: xarray of dimension 3: index, ahead_names, # predictions + + ''' + # + keys = list(kwargs.keys()) + vals = list(kwargs.values()) + # + res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan), + coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type']) + for k, v in kwargs.items(): + try: + match_index = np.stack(set(res.index.values) & set(v.index.values)) + res.loc[match_index, :, k] = v.loc[match_index] + match_index = np.stack(set(res.index.values) & set(v.index.values)) + res.loc[match_index, :, k] = v.loc[match_index] + except AttributeError: + match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) + res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() + match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) + res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() + return res + + -- GitLab