Skip to content
Snippets Groups Projects
Commit 4b9c51c0 authored by lukas leufen's avatar lukas leufen
Browse files

worked on post processing. not ready right now

parent f5d94caa
No related branches found
No related tags found
2 merge requests!24include recent development,!23Lukas issue018 feat evaluate train val
Pipeline #27421 passed
......@@ -10,7 +10,7 @@ from src.run_modules.run_environment import RunEnvironment
from src.run_modules.pre_processing import PreProcessing
from src.run_modules.model_setup import ModelSetup
from src.run_modules.training import Training
from src.run_modules.modules import PostProcessing
from src.run_modules.post_processing import PostProcessing
def main(parser_args):
......
......@@ -2,14 +2,139 @@ __author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-11'
import logging
import os
import numpy as np
import pandas as pd
import xarray as xr
import statsmodels.api as sm
from src.run_modules.run_environment import RunEnvironment
from src.data_handling.data_distributor import Distributor
from src.model_modules.linear_model import OrdinaryLeastSquaredModel
from src import statistics
class PostProcessing(RunEnvironment):
def __init__(self):
super().__init__()
self.model = self.data_store.get("best_model", "general")
self.batch_size = self.data_store.get("batch_size", "general.model")
self.test_data = Distributor(self.data_store.get("generator", "general.test"), self.model, self.batch_size)
self.train_data = self.data_store.get("generator", "general.train")
self._run()
def _run(self):
pass
preds_for_all_stations = self.make_prediction_2()
def calculate_test_score(self):
test_score = self.model.evaluate(generator=self.test_data.distribute_on_batches(), use_multiprocessing=False,
verbose=0, steps=1)
logging.info(f"test score = {test_score}")
self._save_test_score(test_score)
def _save_test_score(self, score):
path = self.data_store.get("experiment_path", "general")
with open(os.path.join(path, "test_scores.txt")) as f:
for index, item in enumerate(score):
f.write(f"{self.model.metrics[index]}, {item}\n")
def make_prediction(self):
self.model.predict_generator(generator=self.test_data.distribute_on_batches(), steps=1)
def train_ols_model(self):
return OrdinaryLeastSquaredModel(self.train_data)
def make_prediction_2(self, freq="1D"):
preds_for_all_stations = []
ols_model = self.train_ols_model()
failed_stations = []
for i, v in enumerate(self.train_data):
data = self.train_data.get_data_generator(i)
keras_pred = data.label.copy()
persi_pred = data.label.copy()
ols_pred = data.label.copy()
pred_input = self.train_data[i][0]
# nn forecast
nn_prediction = self.model.predict(pred_input)
mean, std, transformation_method = data.get_transformation_information(variable='o3')
tmp_keras = statistics.apply_inverse_transformation(nn_prediction, mean, std)
# persistence
tmp_persistence = statistics.apply_inverse_transformation(pred_input.sel({'window': 0, 'variables': 'o3'}), mean, std)
# ols
tmp_ols = statistics.apply_inverse_transformation(ols_model.predict(pred_input), mean, std)
# orig pred
orig_pred = statistics.apply_inverse_transformation(data.label, mean, std)
keras_pred.values = np.swapaxes(np.expand_dims(tmp_keras, axis=1), 2, 0)
ols_pred.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0)
persi_pred.values = np.expand_dims(np.tile(tmp_persistence.squeeze('Stations'), (self.data_store.get("window_lead_time", "general"), 1)), axis=1)
full_index = self.create_fullindex(data.data.indexes['datetime'], freq)
all_preds = self.create_forecast_arrays(full_index,
list(data.label.indexes['window']),
CNN=keras_pred,
persi=persi_pred,
orig=orig_pred,
OLS=ols_pred)
preds_for_all_stations.append(keras_pred)
return preds_for_all_stations
@staticmethod
def create_fullindex(df, freq):
# Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df
# param: df as pandas dataframe
# param: freq as string
# return: index as pandas dataframe
if isinstance(df, pd.DataFrame):
earliest = df.index[0]
latest = df.index[-1]
elif isinstance(df, xr.DataArray):
earliest = df.index[0].values
latest = df.index[-1].values
elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex):
earliest = df[0]
latest = df[-1]
index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
#
return index
@staticmethod
def create_forecast_arrays(index, ahead_names, **kwargs):
'''
This function combines different forecast types into one xarray.
:param index: as index; index for forecasts (e.g. time)
:param ahead_names: as list of str/int: names of ahead values (e.g. hours or days)
:param kwargs: as xarrays; data of forecasts
:return: xarray of dimension 3: index, ahead_names, # predictions
'''
#
keys = list(kwargs.keys())
vals = list(kwargs.values())
#
res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type'])
for k, v in kwargs.items():
try:
match_index = np.stack(set(res.index.values) & set(v.index.values))
res.loc[match_index, :, k] = v.loc[match_index]
match_index = np.stack(set(res.index.values) & set(v.index.values))
res.loc[match_index, :, k] = v.loc[match_index]
except AttributeError:
match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
return res
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment