Skip to content
Snippets Groups Projects
Commit d59f43c2 authored by lukas leufen's avatar lukas leufen
Browse files

add plot and results path to experiment setup, finished raw code for...

add plot and results path to experiment setup, finished raw code for prediction making in postprocessing
parent 4b9c51c0
Branches
Tags
2 merge requests!24include recent development,!23Lukas issue018 feat evaluate train val
Pipeline #27604 passed
...@@ -5,6 +5,7 @@ __date__ = '2019-11-15' ...@@ -5,6 +5,7 @@ __date__ = '2019-11-15'
import logging import logging
import argparse import argparse
from typing import Union, Dict, Any from typing import Union, Dict, Any
import os
from src import helpers from src import helpers
from src.run_modules.run_environment import RunEnvironment from src.run_modules.run_environment import RunEnvironment
...@@ -32,7 +33,7 @@ class ExperimentSetup(RunEnvironment): ...@@ -32,7 +33,7 @@ class ExperimentSetup(RunEnvironment):
window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None,
limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None,
test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None, test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None,
experiment_path=None): experiment_path=None, plot_path=None, forecast_path=None):
# create run framework # create run framework
super().__init__() super().__init__()
...@@ -49,6 +50,16 @@ class ExperimentSetup(RunEnvironment): ...@@ -49,6 +50,16 @@ class ExperimentSetup(RunEnvironment):
self._set_param("experiment_path", exp_path) self._set_param("experiment_path", exp_path)
helpers.check_path_and_create(self.data_store.get("experiment_path", "general")) helpers.check_path_and_create(self.data_store.get("experiment_path", "general"))
# set plot path
default_plot_path = os.path.join(exp_path, "plots")
self._set_param("plot_path", plot_path, default=default_plot_path)
helpers.check_path_and_create(self.data_store.get("plot_path", "general"))
# set results path
default_forecast_path = os.path.join(exp_path, "forecasts")
self._set_param("forecast_path", forecast_path, default_forecast_path)
helpers.check_path_and_create(self.data_store.get("forecast_path", "general"))
# setup for data # setup for data
self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT) self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT)
self._set_param("stations", stations, default=DEFAULT_STATIONS) self._set_param("stations", stations, default=DEFAULT_STATIONS)
......
...@@ -14,6 +14,8 @@ from src.run_modules.run_environment import RunEnvironment ...@@ -14,6 +14,8 @@ from src.run_modules.run_environment import RunEnvironment
from src.data_handling.data_distributor import Distributor from src.data_handling.data_distributor import Distributor
from src.model_modules.linear_model import OrdinaryLeastSquaredModel from src.model_modules.linear_model import OrdinaryLeastSquaredModel
from src import statistics from src import statistics
from src import helpers
from src.helpers import TimeTracking
class PostProcessing(RunEnvironment): class PostProcessing(RunEnvironment):
...@@ -21,17 +23,20 @@ class PostProcessing(RunEnvironment): ...@@ -21,17 +23,20 @@ class PostProcessing(RunEnvironment):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.model = self.data_store.get("best_model", "general") self.model = self.data_store.get("best_model", "general")
self.ols_model = None
self.batch_size = self.data_store.get("batch_size", "general.model") self.batch_size = self.data_store.get("batch_size", "general.model")
self.test_data = Distributor(self.data_store.get("generator", "general.test"), self.model, self.batch_size) self.test_data = self.data_store.get("generator", "general.test")
self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size)
self.train_data = self.data_store.get("generator", "general.train") self.train_data = self.data_store.get("generator", "general.train")
self._run() self._run()
def _run(self): def _run(self):
preds_for_all_stations = self.make_prediction_2() self.train_ols_model()
preds_for_all_stations = self.make_prediction()
def calculate_test_score(self): def calculate_test_score(self):
test_score = self.model.evaluate(generator=self.test_data.distribute_on_batches(), use_multiprocessing=False, test_score = self.model.evaluate(generator=self.test_data_distributed.distribute_on_batches(),
verbose=0, steps=1) use_multiprocessing=False, verbose=0, steps=1)
logging.info(f"test score = {test_score}") logging.info(f"test score = {test_score}")
self._save_test_score(test_score) self._save_test_score(test_score)
...@@ -41,52 +46,77 @@ class PostProcessing(RunEnvironment): ...@@ -41,52 +46,77 @@ class PostProcessing(RunEnvironment):
for index, item in enumerate(score): for index, item in enumerate(score):
f.write(f"{self.model.metrics[index]}, {item}\n") f.write(f"{self.model.metrics[index]}, {item}\n")
def make_prediction(self):
self.model.predict_generator(generator=self.test_data.distribute_on_batches(), steps=1)
def train_ols_model(self): def train_ols_model(self):
return OrdinaryLeastSquaredModel(self.train_data) self.ols_model = OrdinaryLeastSquaredModel(self.train_data)
def make_prediction_2(self, freq="1D"):
preds_for_all_stations = []
ols_model = self.train_ols_model()
failed_stations = []
for i, v in enumerate(self.train_data):
data = self.train_data.get_data_generator(i)
keras_pred = data.label.copy()
persi_pred = data.label.copy()
ols_pred = data.label.copy()
pred_input = self.train_data[i][0]
# nn forecast def make_prediction(self, freq="1D"):
nn_prediction = self.model.predict(pred_input) nn_prediction_all_stations = []
for i, v in enumerate(self.test_data):
data = self.test_data.get_data_generator(i)
nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3)
input_data = self.test_data[i][0]
# get scaling parameters
mean, std, transformation_method = data.get_transformation_information(variable='o3') mean, std, transformation_method = data.get_transformation_information(variable='o3')
tmp_keras = statistics.apply_inverse_transformation(nn_prediction, mean, std)
# nn forecast
nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method)
# persistence # persistence
tmp_persistence = statistics.apply_inverse_transformation(pred_input.sel({'window': 0, 'variables': 'o3'}), mean, std) persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std,
transformation_method)
# ols # ols
tmp_ols = statistics.apply_inverse_transformation(ols_model.predict(pred_input), mean, std) ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method)
# orig pred # orig pred
orig_pred = statistics.apply_inverse_transformation(data.label, mean, std) orig_pred = self._create_orig_forecast(data, None, mean, std, transformation_method)
keras_pred.values = np.swapaxes(np.expand_dims(tmp_keras, axis=1), 2, 0)
ols_pred.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0)
persi_pred.values = np.expand_dims(np.tile(tmp_persistence.squeeze('Stations'), (self.data_store.get("window_lead_time", "general"), 1)), axis=1)
# merge all predictions
full_index = self.create_fullindex(data.data.indexes['datetime'], freq) full_index = self.create_fullindex(data.data.indexes['datetime'], freq)
all_preds = self.create_forecast_arrays(full_index, all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']),
list(data.label.indexes['window']), CNN=nn_prediction,
CNN=keras_pred, persi=persistence_prediction,
persi=persi_pred, orig=orig_pred,
orig=orig_pred, OLS=ols_prediction)
OLS=ols_pred)
preds_for_all_stations.append(keras_pred) # save all forecasts locally
path = self.data_store.get("forecast_path", "general")
file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc")
all_predictions.to_netcdf(file)
return preds_for_all_stations # save nn forecast to return variable
nn_prediction_all_stations.append(nn_prediction)
return nn_prediction_all_stations
@staticmethod
def _create_orig_forecast(data, placeholder, mean, std, transformation_method):
return statistics.apply_inverse_transformation(data.label, mean, std, transformation_method)
def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method):
tmp_ols = self.ols_model.predict(input_data)
tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method)
ols_prediction.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0)
return ols_prediction
def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method):
tmp_persi = input_data.sel({'window': 0, 'variables': 'o3'})
tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method)
window_lead_time = self.data_store.get("window_lead_time", "general")
persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)),
axis=1)
return persistence_prediction
def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method):
tmp_nn = self.model.predict(input_data)
tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method)
nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn, axis=1), 2, 0)
return nn_prediction
@staticmethod
def _create_empty_prediction_arrays(generator, count=1):
return [generator.label.copy()] * count
@staticmethod @staticmethod
def create_fullindex(df, freq): def create_fullindex(df, freq):
...@@ -103,13 +133,15 @@ class PostProcessing(RunEnvironment): ...@@ -103,13 +133,15 @@ class PostProcessing(RunEnvironment):
elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex): elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex):
earliest = df[0] earliest = df[0]
latest = df[-1] latest = df[-1]
else:
raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes "
f"are supported. Given type is {type(df)}.")
index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq)) index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
#
return index return index
@staticmethod @staticmethod
def create_forecast_arrays(index, ahead_names, **kwargs): def create_forecast_arrays(index, ahead_names, **kwargs):
''' """
This function combines different forecast types into one xarray. This function combines different forecast types into one xarray.
:param index: as index; index for forecasts (e.g. time) :param index: as index; index for forecasts (e.g. time)
...@@ -117,22 +149,15 @@ class PostProcessing(RunEnvironment): ...@@ -117,22 +149,15 @@ class PostProcessing(RunEnvironment):
:param kwargs: as xarrays; data of forecasts :param kwargs: as xarrays; data of forecasts
:return: xarray of dimension 3: index, ahead_names, # predictions :return: xarray of dimension 3: index, ahead_names, # predictions
''' """
#
keys = list(kwargs.keys()) keys = list(kwargs.keys())
vals = list(kwargs.values())
#
res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan), res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type']) coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type'])
for k, v in kwargs.items(): for k, v in kwargs.items():
try: try:
match_index = np.stack(set(res.index.values) & set(v.index.values)) match_index = np.stack(set(res.index.values) & set(v.index.values))
res.loc[match_index, :, k] = v.loc[match_index] res.loc[match_index, :, k] = v.loc[match_index]
match_index = np.stack(set(res.index.values) & set(v.index.values)) except AttributeError: # v is xarray type and has no attribute .index
res.loc[match_index, :, k] = v.loc[match_index]
except AttributeError:
match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
return res return res
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment