diff --git a/src/run_modules/experiment_setup.py b/src/run_modules/experiment_setup.py index f502e81b81d93cbcdc1d9ad2709d21c9c0d3ec27..a46e2b173ed47bdf1f0a364063972af9acb60781 100644 --- a/src/run_modules/experiment_setup.py +++ b/src/run_modules/experiment_setup.py @@ -5,6 +5,7 @@ __date__ = '2019-11-15' import logging import argparse from typing import Union, Dict, Any +import os from src import helpers from src.run_modules.run_environment import RunEnvironment @@ -32,7 +33,7 @@ class ExperimentSetup(RunEnvironment): window_lead_time=None, dimensions=None, interpolate_dim=None, interpolate_method=None, limit_nan_fill=None, train_start=None, train_end=None, val_start=None, val_end=None, test_start=None, test_end=None, use_all_stations_on_all_data_sets=True, trainable=False, fraction_of_train=None, - experiment_path=None): + experiment_path=None, plot_path=None, forecast_path=None): # create run framework super().__init__() @@ -49,6 +50,16 @@ class ExperimentSetup(RunEnvironment): self._set_param("experiment_path", exp_path) helpers.check_path_and_create(self.data_store.get("experiment_path", "general")) + # set plot path + default_plot_path = os.path.join(exp_path, "plots") + self._set_param("plot_path", plot_path, default=default_plot_path) + helpers.check_path_and_create(self.data_store.get("plot_path", "general")) + + # set results path + default_forecast_path = os.path.join(exp_path, "forecasts") + self._set_param("forecast_path", forecast_path, default_forecast_path) + helpers.check_path_and_create(self.data_store.get("forecast_path", "general")) + # setup for data self._set_param("var_all_dict", var_all_dict, default=DEFAULT_VAR_ALL_DICT) self._set_param("stations", stations, default=DEFAULT_STATIONS) diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index cd6a8d38a5b3dfebfba3f4a290c52ae8627cdd99..35d93dcbd932d1c298c0744fcd0205697576bb4c 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -14,6 +14,8 @@ from src.run_modules.run_environment import RunEnvironment from src.data_handling.data_distributor import Distributor from src.model_modules.linear_model import OrdinaryLeastSquaredModel from src import statistics +from src import helpers +from src.helpers import TimeTracking class PostProcessing(RunEnvironment): @@ -21,17 +23,20 @@ class PostProcessing(RunEnvironment): def __init__(self): super().__init__() self.model = self.data_store.get("best_model", "general") + self.ols_model = None self.batch_size = self.data_store.get("batch_size", "general.model") - self.test_data = Distributor(self.data_store.get("generator", "general.test"), self.model, self.batch_size) + self.test_data = self.data_store.get("generator", "general.test") + self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size) self.train_data = self.data_store.get("generator", "general.train") self._run() def _run(self): - preds_for_all_stations = self.make_prediction_2() + self.train_ols_model() + preds_for_all_stations = self.make_prediction() def calculate_test_score(self): - test_score = self.model.evaluate(generator=self.test_data.distribute_on_batches(), use_multiprocessing=False, - verbose=0, steps=1) + test_score = self.model.evaluate(generator=self.test_data_distributed.distribute_on_batches(), + use_multiprocessing=False, verbose=0, steps=1) logging.info(f"test score = {test_score}") self._save_test_score(test_score) @@ -41,52 +46,77 @@ class PostProcessing(RunEnvironment): for index, item in enumerate(score): f.write(f"{self.model.metrics[index]}, {item}\n") - def make_prediction(self): - self.model.predict_generator(generator=self.test_data.distribute_on_batches(), steps=1) - def train_ols_model(self): - return OrdinaryLeastSquaredModel(self.train_data) - - def make_prediction_2(self, freq="1D"): - preds_for_all_stations = [] - ols_model = self.train_ols_model() - failed_stations = [] - for i, v in enumerate(self.train_data): - data = self.train_data.get_data_generator(i) - keras_pred = data.label.copy() - persi_pred = data.label.copy() - ols_pred = data.label.copy() - pred_input = self.train_data[i][0] + self.ols_model = OrdinaryLeastSquaredModel(self.train_data) - # nn forecast - nn_prediction = self.model.predict(pred_input) + def make_prediction(self, freq="1D"): + nn_prediction_all_stations = [] + for i, v in enumerate(self.test_data): + data = self.test_data.get_data_generator(i) + + nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3) + input_data = self.test_data[i][0] + + # get scaling parameters mean, std, transformation_method = data.get_transformation_information(variable='o3') - tmp_keras = statistics.apply_inverse_transformation(nn_prediction, mean, std) + + # nn forecast + nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method) # persistence - tmp_persistence = statistics.apply_inverse_transformation(pred_input.sel({'window': 0, 'variables': 'o3'}), mean, std) + persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std, + transformation_method) # ols - tmp_ols = statistics.apply_inverse_transformation(ols_model.predict(pred_input), mean, std) + ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method) # orig pred - orig_pred = statistics.apply_inverse_transformation(data.label, mean, std) - - keras_pred.values = np.swapaxes(np.expand_dims(tmp_keras, axis=1), 2, 0) - ols_pred.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0) - persi_pred.values = np.expand_dims(np.tile(tmp_persistence.squeeze('Stations'), (self.data_store.get("window_lead_time", "general"), 1)), axis=1) + orig_pred = self._create_orig_forecast(data, None, mean, std, transformation_method) + # merge all predictions full_index = self.create_fullindex(data.data.indexes['datetime'], freq) - all_preds = self.create_forecast_arrays(full_index, - list(data.label.indexes['window']), - CNN=keras_pred, - persi=persi_pred, - orig=orig_pred, - OLS=ols_pred) + all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']), + CNN=nn_prediction, + persi=persistence_prediction, + orig=orig_pred, + OLS=ols_prediction) - preds_for_all_stations.append(keras_pred) + # save all forecasts locally + path = self.data_store.get("forecast_path", "general") + file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc") + all_predictions.to_netcdf(file) - return preds_for_all_stations + # save nn forecast to return variable + nn_prediction_all_stations.append(nn_prediction) + return nn_prediction_all_stations + + @staticmethod + def _create_orig_forecast(data, placeholder, mean, std, transformation_method): + return statistics.apply_inverse_transformation(data.label, mean, std, transformation_method) + + def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method): + tmp_ols = self.ols_model.predict(input_data) + tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) + ols_prediction.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0) + return ols_prediction + + def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method): + tmp_persi = input_data.sel({'window': 0, 'variables': 'o3'}) + tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) + window_lead_time = self.data_store.get("window_lead_time", "general") + persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)), + axis=1) + return persistence_prediction + + def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method): + tmp_nn = self.model.predict(input_data) + tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method) + nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn, axis=1), 2, 0) + return nn_prediction + + @staticmethod + def _create_empty_prediction_arrays(generator, count=1): + return [generator.label.copy()] * count @staticmethod def create_fullindex(df, freq): @@ -103,13 +133,15 @@ class PostProcessing(RunEnvironment): elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex): earliest = df[0] latest = df[-1] + else: + raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes " + f"are supported. Given type is {type(df)}.") index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq)) - # return index @staticmethod def create_forecast_arrays(index, ahead_names, **kwargs): - ''' + """ This function combines different forecast types into one xarray. :param index: as index; index for forecasts (e.g. time) @@ -117,22 +149,15 @@ class PostProcessing(RunEnvironment): :param kwargs: as xarrays; data of forecasts :return: xarray of dimension 3: index, ahead_names, # predictions - ''' - # + """ keys = list(kwargs.keys()) - vals = list(kwargs.values()) - # res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan), coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type']) for k, v in kwargs.items(): try: match_index = np.stack(set(res.index.values) & set(v.index.values)) res.loc[match_index, :, k] = v.loc[match_index] - match_index = np.stack(set(res.index.values) & set(v.index.values)) - res.loc[match_index, :, k] = v.loc[match_index] - except AttributeError: - match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) - res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() + except AttributeError: # v is xarray type and has no attribute .index match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values)) res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() return res