diff --git a/src/data_handling/__init__.py b/src/data_handling/__init__.py index a689cbe3449f09a1447205f0271244c8dcaa83f5..5139d13da91025dad15db04fdfea34309c4e28ff 100644 --- a/src/data_handling/__init__.py +++ b/src/data_handling/__init__.py @@ -7,3 +7,9 @@ postprocessing, loading, and distribution for training. __author__ = 'Lukas Leufen, Felix Kleinert' __date__ = '2020-04-17' + + +from .bootstraps import BootStraps +from .data_preparation import DataPrep +from .data_generator import DataGenerator +from .data_distributor import Distributor diff --git a/src/data_handling/data_generator.py b/src/data_handling/data_generator.py index 34d0955833b1534f3f86e54ec1c98ed3f8ad4cc1..0f8b9959c72d5f01c63e85d85e8fbf570ae6e23c 100644 --- a/src/data_handling/data_generator.py +++ b/src/data_handling/data_generator.py @@ -278,6 +278,7 @@ class DataGenerator(keras.utils.Sequence): :param load_local_tmp_storage: say if data should be processed from scratch or loaded as already processed data from tmp pickle file to save computational time (but of course more disk space required). :param save_local_tmp_storage: save processed data as temporal file locally (default True) + :return: preprocessed data as a DataPrep instance """ station = self.get_station_key(key) diff --git a/src/helpers/statistics.py b/src/helpers/statistics.py index 74bac269fb904db81365450e92536e8c494bf027..056f92bec25b8d5216988f4dacb8fcd1e5257ab5 100644 --- a/src/helpers/statistics.py +++ b/src/helpers/statistics.py @@ -153,11 +153,11 @@ class SkillScores: """ - def __init__(self, internal_data): + def __init__(self, internal_data: Data): """Set internal data.""" self.internal_data = internal_data - def skill_scores(self, window_lead_time): + def skill_scores(self, window_lead_time: int) -> pd.DataFrame: """ Calculate skill scores for all combinations of CNN, persistence and OLS. @@ -174,7 +174,7 @@ class SkillScores: self.general_skill_score(data, forecast_name="CNN", reference_name="OLS")] return skill_score - def climatological_skill_scores(self, external_data: Data, window_lead_time: int) -> Data: + def climatological_skill_scores(self, external_data: Data, window_lead_time: int) -> xr.DataArray: """ Calculate climatological skill scores according to Murphy (1988). diff --git a/src/plotting/postprocessing_plotting.py b/src/plotting/postprocessing_plotting.py index e1bf1c1a59ba308539597087a68b0df493612d6b..223add3d6ad34bcf4c13d87c08aab32e70d11636 100644 --- a/src/plotting/postprocessing_plotting.py +++ b/src/plotting/postprocessing_plotting.py @@ -20,7 +20,7 @@ import xarray as xr from matplotlib.backends.backend_pdf import PdfPages from src import helpers -from src.data_handling.data_generator import DataGenerator +from src.data_handling import DataGenerator from src.helpers import TimeTrackingWrapper logging.getLogger('matplotlib').setLevel(logging.WARNING) diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index fd9927fab331e2c9ef69c5d3ad6a78674c37ba22..caf764f74903f0b3a161d857e404cb3c243a4aba 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -1,32 +1,66 @@ +"""Post-processing module.""" + __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-11' import inspect import logging import os -from typing import Dict +from typing import Dict, Tuple, Union, List import keras import numpy as np import pandas as pd import xarray as xr -from src.data_handling.bootstraps import BootStraps -from src.data_handling.data_distributor import Distributor -from src.data_handling.data_generator import DataGenerator +from src.data_handling import BootStraps, Distributor, DataGenerator, DataPrep from src.helpers.datastore import NameNotFoundInDataStore from src.helpers import TimeTracking, statistics from src.model_modules.linear_model import OrdinaryLeastSquaredModel from src.model_modules.model_class import AbstractModelClass from src.plotting.postprocessing_plotting import PlotMonthlySummary, PlotStationMap, PlotClimatologicalSkillScore, \ PlotCompetitiveSkillScore, PlotTimeSeries, PlotBootstrapSkillScore, PlotAvailability, PlotConditionalQuantiles -# from src.plotting.postprocessing_plotting import plot_conditional_quantiles from src.run_modules.run_environment import RunEnvironment class PostProcessing(RunEnvironment): + """ + Perform post-processing for performance evaluation. + + Schedule of post-processing: + #. train a ordinary least squared model (ols) for reference + #. create forecasts for nn, ols, and persistence + #. evaluate feature importance with bootstrapped predictions + #. calculate skill scores + #. create plots + + Required objects [scope] from data store: + * `best_model` [.] or locally saved model plus `model_name` [model] and `model` [model] + * `generator` [train, val, test, train_val] + * `forecast_path` [.] + * `plot_path` [postprocessing] + * `experiment_path` [.] + * `target_var` [.] + * `sampling` [.] + * `window_lead_time` [.] + * `evaluate_bootstraps` [postprocessing] and if enabled: + + * `create_new_bootstraps` [postprocessing] + * `bootstrap_path` [postprocessing] + * `number_of_bootstraps` [postprocessing] + + Optional objects + * `batch_size` [model] + + Creates + * forecasts in `forecast_path` if enabled + * bootstraps in `bootstrap_path` if enabled + * plots in `plot_path` + + """ def __init__(self): + """Initialise and run post-processing.""" super().__init__() self.model: keras.Model = self._load_model() self.ols_model = None @@ -44,19 +78,23 @@ class PostProcessing(RunEnvironment): self._run() def _run(self): + # ols model with TimeTracking(): self.train_ols_model() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip train_ols_model() whenever it is possible to save time.") + + # forecasts with TimeTracking(): self.make_prediction() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip make_prediction() whenever it is possible to save time.") + self.calculate_test_score() # bootstraps - if self.data_store.get("evaluate_bootstraps", "general.postprocessing"): + if self.data_store.get("evaluate_bootstraps", "postprocessing"): with TimeTracking(name="calculate bootstraps"): - create_new_bootstraps = self.data_store.get("create_new_bootstraps", "general.postprocessing") + create_new_bootstraps = self.data_store.get("create_new_bootstraps", "postprocessing") self.bootstrap_postprocessing(create_new_bootstraps) # skill scores @@ -68,8 +106,13 @@ class PostProcessing(RunEnvironment): def bootstrap_postprocessing(self, create_new_bootstraps: bool, _iter: int = 0) -> None: """ - Create skill scores of bootstrapped data. Also creates these bootstraps if create_new_bootstraps is true or a - failure occurred during skill score calculation. Sets class attribute bootstrap_skill_scores. + Calculate skill scores of bootstrapped data. + + Create bootstrapped data if create_new_bootstraps is true or a failure occurred during skill score calculation + (this will happen by default, if no bootstrapped data is available locally). Set class attribute + bootstrap_skill_scores. This method is implemented in a recursive fashion, but is only allowed to call itself + once. + :param create_new_bootstraps: calculate all bootstrap predictions and overwrite already available predictions :param _iter: internal counter to reduce unnecessary recursive calls (maximum number is 2, otherwise something went wrong). @@ -87,15 +130,17 @@ class PostProcessing(RunEnvironment): def create_bootstrap_forecast(self) -> None: """ - Creates the bootstrapped predictions for all stations and variables. These forecasts are saved in bootstrap_path - with the names `bootstraps_{var}_{station}.nc` and `bootstraps_labels_{station}.nc`. + Create bootstrapped predictions for all stations and variables. + + These forecasts are saved in bootstrap_path with the names `bootstraps_{var}_{station}.nc` and + `bootstraps_labels_{station}.nc`. """ # forecast with TimeTracking(name=inspect.stack()[0].function): # extract all requirements from data store bootstrap_path = self.data_store.get("bootstrap_path") forecast_path = self.data_store.get("forecast_path") - number_of_bootstraps = self.data_store.get("number_of_bootstraps", "general.postprocessing") + number_of_bootstraps = self.data_store.get("number_of_bootstraps", "postprocessing") # set bootstrap class bootstraps = BootStraps(self.test_data, bootstrap_path, number_of_bootstraps) @@ -129,13 +174,14 @@ class PostProcessing(RunEnvironment): def calculate_bootstrap_skill_scores(self) -> Dict[str, xr.DataArray]: """ + Calculate skill score of bootstrapped variables. + Use already created bootstrap predictions and the original predictions (the not-bootstrapped ones) and calculate skill scores for the bootstraps. The result is saved as a xarray DataArray in a dictionary structure separated for each station (keys of dictionary). :return: The result dictionary with station-wise skill scores """ - with TimeTracking(name=inspect.stack()[0].function): # extract all requirements from data store bootstrap_path = self.data_store.get("bootstrap_path") @@ -176,17 +222,39 @@ class PostProcessing(RunEnvironment): score[station] = xr.DataArray(skill, dims=["boot_var", "ahead"]) return score - def _load_model(self): + def _load_model(self) -> keras.models: + """ + Load NN model either from data store or from local path. + + :return: the model + """ try: model = self.data_store.get("best_model") except NameNotFoundInDataStore: - logging.info("no model saved in data store. trying to load model from experiment path") + logging.info("No model was saved in data store. Try to load model from experiment path.") model_name = self.data_store.get("model_name", "model") model_class: AbstractModelClass = self.data_store.get("model", "model") model = keras.models.load_model(model_name, custom_objects=model_class.custom_objects) return model def plot(self): + """ + Create all plots. + + Plots are defined in experiment set up by `plot_list`. As default, all (following) plots are enabled: + + * :py:class:`PlotBootstrapSkillScore <src.plotting.postprocessing_plotting.PlotBootstrapSkillScore>` + * :py:class:`PlotConditionalQuantiles <src.plotting.postprocessing_plotting.PlotConditionalQuantiles>` + * :py:class:`PlotStationMap <src.plotting.postprocessing_plotting.PlotStationMap>` + * :py:class:`PlotMonthlySummary <src.plotting.postprocessing_plotting.PlotMonthlySummary>` + * :py:class:`PlotClimatologicalSkillScore <src.plotting.postprocessing_plotting.PlotClimatologicalSkillScore>` + * :py:class:`PlotCompetitiveSkillScore <src.plotting.postprocessing_plotting.PlotCompetitiveSkillScore>` + * :py:class:`PlotTimeSeries <src.plotting.postprocessing_plotting.PlotTimeSeries>` + * :py:class:`PlotAvailability <src.plotting.postprocessing_plotting.PlotAvailability>` + + .. note:: Bootstrap plots are only created if bootstraps are evaluated. + + """ logging.debug("Run plotting routines...") path = self.data_store.get("forecast_path") @@ -215,21 +283,27 @@ class PostProcessing(RunEnvironment): PlotAvailability(avail_data, plot_folder=self.plot_path) def calculate_test_score(self): + """Evaluate test score of model and save locally.""" test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(), use_multiprocessing=False, verbose=0, steps=1) - logging.info(f"test score = {test_score}") - self._save_test_score(test_score) - - def _save_test_score(self, score): path = self.data_store.get("experiment_path") - with open(os.path.join(path, "test_scores.txt")) as f: - for index, item in enumerate(score): - f.write(f"{self.model.metrics[index]}, {item}\n") + with open(os.path.join(path, "test_scores.txt"), "a") as f: + for index, item in enumerate(test_score): + logging.info(f"{self.model.metrics_names[index]}, {item}") + f.write(f"{self.model.metrics_names[index]}, {item}\n") def train_ols_model(self): + """Train ordinary least squared model on train data.""" self.ols_model = OrdinaryLeastSquaredModel(self.train_data) def make_prediction(self): + """ + Create predictions for NN, OLS, and persistence and add true observation as reference. + + Predictions are filled in an array with full index range. Therefore, predictions can have missing values. All + predictions for a single station are stored locally under `<forecast/forecast_norm>_<station>_test.nc` and can + be found inside `forecast_path`. + """ logging.debug("start make_prediction") for i, _ in enumerate(self.test_data): data = self.test_data.get_data_generator(i) @@ -272,17 +346,48 @@ class PostProcessing(RunEnvironment): file = os.path.join(path, f"{prefix}_{data.station[0]}_test.nc") all_predictions.to_netcdf(file) - def _get_frequency(self): + def _get_frequency(self) -> str: + """Get frequency abbreviation.""" getter = {"daily": "1D", "hourly": "1H"} return getter.get(self._sampling, None) - def _create_observation(self, data, _, mean, std, transformation_method, normalised): + @staticmethod + def _create_observation(data: DataPrep, _, mean: xr.DataArray, std: xr.DataArray, transformation_method: str, + normalised: bool) -> xr.DataArray: + """ + Create observation as ground truth from given data. + + Inverse transformation is applied to the ground truth to get the output in the original space. + + :param data: transposed observation from DataPrep + :param mean: mean of target value transformation + :param std: standard deviation of target value transformation + :param transformation_method: target values transformation method + :param normalised: transform ground truth in original space if false, or use normalised predictions if true + + :return: filled data array with observation + """ obs = data.label.copy() if not normalised: obs = statistics.apply_inverse_transformation(obs, mean, std, transformation_method) return obs - def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method, normalised): + def _create_ols_forecast(self, input_data: xr.DataArray, ols_prediction: xr.DataArray, mean: xr.DataArray, + std: xr.DataArray, transformation_method: str, normalised: bool) -> xr.DataArray: + """ + Create ordinary least square model forecast with given input data. + + Inverse transformation is applied to the forecast to get the output in the original space. + + :param data: transposed history from DataPrep + :param ols_prediction: empty array in right shape to fill with data + :param mean: mean of target value transformation + :param std: standard deviation of target value transformation + :param transformation_method: target values transformation method + :param normalised: transform prediction in original space if false, or use normalised predictions if true + + :return: filled data array with ols predictions + """ tmp_ols = self.ols_model.predict(input_data) if not normalised: tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method) @@ -291,7 +396,23 @@ class PostProcessing(RunEnvironment): ols_prediction.values = np.swapaxes(tmp_ols, 2, 0) if target_shape != tmp_ols.shape else tmp_ols return ols_prediction - def _create_persistence_forecast(self, data, persistence_prediction, mean, std, transformation_method, normalised): + def _create_persistence_forecast(self, data: DataPrep, persistence_prediction: xr.DataArray, mean: xr.DataArray, + std: xr.DataArray, transformation_method: str, normalised: bool) -> xr.DataArray: + """ + Create persistence forecast with given data. + + Persistence is deviated from the value at t=0 and applied to all following time steps (t+1, ..., t+window). + Inverse transformation is applied to the forecast to get the output in the original space. + + :param data: DataPrep + :param persistence_prediction: empty array in right shape to fill with data + :param mean: mean of target value transformation + :param std: standard deviation of target value transformation + :param transformation_method: target values transformation method + :param normalised: transform prediction in original space if false, or use normalised predictions if true + + :return: filled data array with persistence predictions + """ tmp_persi = data.observation.copy().sel({'window': 0}) if not normalised: tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method) @@ -300,17 +421,23 @@ class PostProcessing(RunEnvironment): axis=1) return persistence_prediction - def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method, normalised): + def _create_nn_forecast(self, input_data: xr.DataArray, nn_prediction: xr.DataArray, mean: xr.DataArray, + std: xr.DataArray, transformation_method: str, normalised: bool) -> xr.DataArray: """ - create the nn forecast for given input data. Inverse transformation is applied to the forecast to get the output - in the original space. Furthermore, only the output of the main branch is returned (not all minor branches, if - the network has multiple output branches). The main branch is defined to be the last entry of all outputs. - :param input_data: - :param nn_prediction: - :param mean: - :param std: - :param transformation_method: - :return: + Create NN forecast for given input data. + + Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the + output of the main branch is returned (not all minor branches, if the network has multiple output branches). The + main branch is defined to be the last entry of all outputs. + + :param input_data: transposed history from DataPrep + :param nn_prediction: empty array in right shape to fill with data + :param mean: mean of target value transformation + :param std: standard deviation of target value transformation + :param transformation_method: target values transformation method + :param normalised: transform prediction in original space if false, or use normalised predictions if true + + :return: filled data array with nn predictions """ tmp_nn = self.model.predict(input_data) if not normalised: @@ -330,11 +457,15 @@ class PostProcessing(RunEnvironment): return [generator.label.copy() for _ in range(count)] @staticmethod - def create_fullindex(df, freq): - # Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df - # param: df as pandas dataframe - # param: freq as string - # return: index as pandas dataframe + def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame: + """ + Create full index from first and last date inside df and resample with given frequency. + + :param df: use time range of this data set + :param freq: frequency of full index + + :return: empty data frame with full index. + """ if isinstance(df, pd.DataFrame): earliest = df.index[0] latest = df.index[-1] @@ -351,13 +482,14 @@ class PostProcessing(RunEnvironment): return index @staticmethod - def create_forecast_arrays(index, ahead_names, **kwargs): + def create_forecast_arrays(index: pd.DataFrame, ahead_names: List[Union[str, int]], **kwargs): """ - This function combines different forecast types into one xarray. + Combine different forecast types into single xarray. - :param index: as index; index for forecasts (e.g. time) - :param ahead_names: as list of str/int: names of ahead values (e.g. hours or days) + :param index: index for forecasts (e.g. time) + :param ahead_names: names of ahead values (e.g. hours or days) :param kwargs: as xarrays; data of forecasts + :return: xarray of dimension 3: index, ahead_names, # predictions """ @@ -373,7 +505,15 @@ class PostProcessing(RunEnvironment): res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose() return res - def _get_external_data(self, station): + def _get_external_data(self, station: str) -> Union[xr.DataArray, None]: + """ + Get external data for given station. + + External data is defined as data that is not part of the observed period. From an evaluation perspective, this + refers to data, that is no test data, and therefore to train and val data. + + :param station: name of station to load external data. + """ try: data = self.train_val_data.get_data_generator(station) mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) @@ -383,7 +523,16 @@ class PostProcessing(RunEnvironment): except KeyError: return None - def calculate_skill_scores(self): + def calculate_skill_scores(self) -> Tuple[Dict, Dict]: + """ + Calculate skill scores of CNN forecast. + + The competitive skill score compares the CNN prediction with persistence and ordinary least squares forecasts. + Whereas, the climatological skill scores evaluates the CNN prediction in terms of meaningfulness in comparison + to different climatological references. + + :return: competitive and climatological skill scores + """ path = self.data_store.get("forecast_path") window_lead_time = self.data_store.get("window_lead_time") skill_score_competitive = {} diff --git a/src/run_modules/pre_processing.py b/src/run_modules/pre_processing.py index bc55ad7f18d256d1126b5ad6581634812e7ea8dd..b4b36a20bf9ed7827a1ac151141c13122f517e33 100644 --- a/src/run_modules/pre_processing.py +++ b/src/run_modules/pre_processing.py @@ -1,3 +1,5 @@ +"""Pre-processing module.""" + __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-11-25' @@ -8,7 +10,7 @@ from typing import Tuple, Dict, List import numpy as np import pandas as pd -from src.data_handling.data_generator import DataGenerator +from src.data_handling import DataGenerator from src.helpers import TimeTracking from src.configuration import path_config from src.helpers.join import EmptyQueryResult diff --git a/src/run_modules/training.py b/src/run_modules/training.py index 8bbd7723d1ae9bdb049af43026058175e49403ef..8cb4726fdc84ad10e62106c1d2bcbf899457e31d 100644 --- a/src/run_modules/training.py +++ b/src/run_modules/training.py @@ -11,7 +11,7 @@ from typing import Union import keras from keras.callbacks import Callback, History -from src.data_handling.data_distributor import Distributor +from src.data_handling import Distributor from src.model_modules.keras_extensions import CallbackHandler from src.plotting.training_monitoring import PlotModelHistory, PlotModelLearningRate from src.run_modules.run_environment import RunEnvironment @@ -21,6 +21,9 @@ class Training(RunEnvironment): """ Train your model with this module. + This module isn't required to run, if only a fresh post-processing is preformed. Either remove training call from + your run script or set create_new_model and trainable both to false. + Schedule of training: #. set_generators(): set generators for training, validation and testing and distribute according to batch size #. make_predict_function(): create predict function before distribution on multiple nodes (detailed information diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py index 3aac2abfd51cfc7d75b54baae3eab5a1828f7318..338bb7fd5cd1cb9a743cbc54ec9c4ed388a6bdaf 100644 --- a/test/test_modules/test_pre_processing.py +++ b/test/test_modules/test_pre_processing.py @@ -63,7 +63,7 @@ class TestPreProcessing: def test_create_set_split_not_all_stations(self, caplog, obj_with_exp_setup): caplog.set_level(logging.DEBUG) - obj_with_exp_setup.data_store.set("use_all_stations_on_all_data_sets", False, "general.awesome") + obj_with_exp_setup.data_store.set("use_all_stations_on_all_data_sets", False, "general") obj_with_exp_setup.create_set_split(slice(0, 2), "awesome") assert caplog.record_tuples[0] == ('root', 10, "Awesome stations (len=2): ['DEBW107', 'DEBY081']") data_store = obj_with_exp_setup.data_store