diff --git a/src/data_handling/bootstraps.py b/src/data_handling/bootstraps.py index 9f61dace61af9e5ba2dc9176fe0d199047a3723f..412cb4448180e89462a270930ec710936a203db3 100644 --- a/src/data_handling/bootstraps.py +++ b/src/data_handling/bootstraps.py @@ -2,7 +2,6 @@ __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2020-02-07' -from src.run_modules.run_environment import RunEnvironment from src.data_handling.data_generator import DataGenerator import numpy as np import logging @@ -11,6 +10,7 @@ import xarray as xr import os import re from src import helpers +from typing import List class BootStrapGenerator: @@ -23,7 +23,6 @@ class BootStrapGenerator: self.chunksize = chunksize self.bootstrap_path = bootstrap_path self._iterator = 0 - self.bootstrap_meta = [] def __len__(self): """ @@ -47,7 +46,6 @@ class BootStrapGenerator: station = self.orig_generator.get_station_key(i) logging.info(f"station: {station}") hist, label = data - len_of_label = len(label) shuffled_data = self.load_boot_data(station) for var in self.variables: logging.info(f" var: {var}") @@ -57,34 +55,21 @@ class BootStrapGenerator: shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots").transpose("datetime", "window", "Stations", "variables") boot_hist = boot_hist.combine_first(shuffled_var) boot_hist = boot_hist.sortby("variables") - self.bootstrap_meta.extend([[var, station]]*len_of_label) yield boot_hist, label return - def get_generator_refactored(self): + def get_bootstrap_meta(self) -> List: """ - This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return - the history and label data of this generator. - :return: + Create meta data on ordering of variable bootstraps according to ordering from get_generator method. + :return: list with bootstrapped variable first and its corresponding station second. """ - while True: - for i, data in enumerate(self.orig_generator): - station = self.orig_generator.get_station_key(i) - logging.info(f"station: {station}") - hist, label = data - len_of_label = len(label) - shuffled_data = self.load_boot_data(station) - for var in self.variables: - logging.info(f" var: {var}") - for boot in range(self.boots): - logging.debug(f"boot: {boot}") - boot_hist = hist.sel(variables=helpers.list_pop(self.variables, var)) - shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots").transpose("datetime", "window", "Stations", "variables") - boot_hist = boot_hist.combine_first(shuffled_var) - boot_hist = boot_hist.sortby("variables") - self.bootstrap_meta.extend([[var, station]]*len_of_label) - yield boot_hist, label, var, station - return + bootstrap_meta = [] + for station in self.stations: + label = self.orig_generator.get_data_generator(station).get_transposed_label() + for var in self.variables: + for boot in range(self.boots): + bootstrap_meta.extend([[var, station]] * len(label)) + return bootstrap_meta def get_orig_prediction(self, path, file_name, prediction_name="CNN"): file = os.path.join(path, file_name) @@ -100,11 +85,9 @@ class BootStrapGenerator: return shuffled_data -class BootStraps(RunEnvironment): +class BootStraps: def __init__(self, data, bootstrap_path, number_bootstraps=10): - - super().__init__() self.data: DataGenerator = data self.number_bootstraps = number_bootstraps self.bootstrap_path = bootstrap_path @@ -113,14 +96,11 @@ class BootStraps(RunEnvironment): self._boot_strap_generator = BootStrapGenerator(self.data, self.number_bootstraps, self.chunks, self.bootstrap_path) def get_boot_strap_meta(self): - return self._boot_strap_generator.bootstrap_meta + return self._boot_strap_generator.get_bootstrap_meta() def boot_strap_generator(self): return self._boot_strap_generator.get_generator() - def boot_strap_generator_refactored(self): - return self._boot_strap_generator.get_generator_refactored() - def get_boot_strap_generator_length(self): return self._boot_strap_generator.__len__() diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index b8f510bd777c6c2140f9d7b9314ae10f931caf67..f2d5a7d9b528beae954b2671effd6400a694f4aa 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -5,7 +5,6 @@ __date__ = '2019-12-11' import logging import os -import dask.array as da import keras import numpy as np import pandas as pd @@ -51,62 +50,40 @@ class PostProcessing(RunEnvironment): self.make_prediction() logging.info("take a look on the next reported time measure. If this increases a lot, one should think to " "skip make_prediction() whenever it is possible to save time.") - if self.data_store.get("evaluate_bootstraps", "general.postprocessing"): - self.bootstrap_skill_scores = self.create_boot_straps_refactored() + + # skill scores self.skill_scores = self.calculate_skill_scores() - self.plot() - def create_boot_straps_refactored(self): + # bootstraps + if self.data_store.get("evaluate_bootstraps", "general.postprocessing"): + self.create_boot_straps() + self.bootstrap_skill_scores = self.calculate_bootstrap_skill_scores() - bootstrap_path = self.data_store.get("bootstrap_path", "general") - forecast_path = self.data_store.get("forecast_path", "general") - window_lead_time = self.data_store.get("window_lead_time", "general") - bootstraps = BootStraps(self.test_data, bootstrap_path, 20) - bootstrap_predictions = [] - bootstrap_labels = [] - keras.backend.set_learning_phase(0) - with TimeTracking(name="boot predictions"): - station_previous = None - for boot in bootstraps.boot_strap_generator_refactored(): - input_data, label, variable, station = boot - predictions = self.model.predict(input_data) - if isinstance(predictions, list): - predictions = predictions[-1] - - predictions = np.expand_dims(predictions, 2) - coords = (range(predictions.shape[0]), range(1, window_lead_time + 1)) - tmp = xr.DataArray(predictions, coords=(*coords, [variable]), dims=["index", "ahead", "type"]) - file_name = os.path.join(forecast_path, f"bootstraps_{variable}_{station}.nc") - tmp.to_netcdf(file_name) - if station_previous != station: - labels = label.assign_coords(type="obs").expand_dims("type").drop(["Stations", "variables"]).rename({"datetime": "index", "window": "ahead"}) - file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") - # labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "ahead", "type"]) - labels.to_netcdf(file_name) - station_previous = station - - # stopped here, this implementation is slower, than the old one, take a look on - # https://towardsdatascience.com/keras-data-generators-and-how-to-use-them-b69129ed779c + # plotting + self.plot() def create_boot_straps(self): - # forecast - - bootstrap_path = self.data_store.get("bootstrap_path", "general") - forecast_path = self.data_store.get("forecast_path", "general") - window_lead_time = self.data_store.get("window_lead_time", "general") - bootstraps = BootStraps(self.test_data, bootstrap_path, 20) with TimeTracking(name="boot predictions"): + bootstrap_path = self.data_store.get("bootstrap_path", "general") + forecast_path = self.data_store.get("forecast_path", "general") + window_lead_time = self.data_store.get("window_lead_time", "general") + bootstraps = BootStraps(self.test_data, bootstrap_path, 20) + # make bootstrap predictions bootstrap_predictions = self.model.predict_generator(generator=bootstraps.boot_strap_generator(), - steps=bootstraps.get_boot_strap_generator_length()) + steps=bootstraps.get_boot_strap_generator_length(), + use_multiprocessing=True) if isinstance(bootstrap_predictions, list): bootstrap_predictions = bootstrap_predictions[-1] + # get bootstrap prediction meta data bootstrap_meta = np.array(bootstraps.get_boot_strap_meta()) + # save bootstrap predictions separately for each station and variable combination variables = np.unique(bootstrap_meta[:, 0]) for station in np.unique(bootstrap_meta[:, 1]): coords = None for boot in variables: + # store each variable - station - combination ind = np.all(bootstrap_meta == [boot, station], axis=1) length = sum(ind) sel = bootstrap_predictions[ind].reshape((length, window_lead_time, 1)) @@ -114,37 +91,43 @@ class PostProcessing(RunEnvironment): tmp = xr.DataArray(sel, coords=(*coords, [boot]), dims=["index", "ahead", "type"]) file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") tmp.to_netcdf(file_name) + # store also true labels for each station labels = bootstraps.get_labels(station).reshape((length, window_lead_time, 1)) file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "ahead", "type"]) labels.to_netcdf(file_name) - # file_name = os.path.join(forecast_path, f"bootstraps_orig.nc") - # orig = xr.open_dataarray(file_name) - - - # calc skill scores - skill_scores = statistics.SkillScores(None) - score = {} - for station in np.unique(bootstrap_meta[:, 1]): - file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") - labels = xr.open_dataarray(file_name) - shape = labels.shape - orig = bootstraps.get_orig_prediction(forecast_path, f"forecasts_norm_{station}_test.nc").reshape(shape) - orig = xr.DataArray(orig, coords=(range(shape[0]), range(1, shape[1] + 1), ["orig"]), dims=["index", "ahead", "type"]) - skill = pd.DataFrame(columns=range(1, window_lead_time + 1)) - for boot in variables: - file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") - boot_data = xr.open_dataarray(file_name) - boot_data = boot_data.combine_first(labels) - boot_data = boot_data.combine_first(orig) - boot_scores = [] - for iahead in range(window_lead_time): - data = boot_data.sel(ahead=iahead + 1) - boot_scores.append(skill_scores.general_skill_score(data, forecast_name=boot, reference_name="orig")) - skill.loc[boot] = np.array(boot_scores) - score[station] = xr.DataArray(skill, dims=["boot_var", "ahead"]) - return score + def calculate_bootstrap_skill_scores(self): + + with TimeTracking(name="boot skill scores"): + + bootstrap_path = self.data_store.get("bootstrap_path", "general") + forecast_path = self.data_store.get("forecast_path", "general") + window_lead_time = self.data_store.get("window_lead_time", "general") + bootstraps = BootStraps(self.test_data, bootstrap_path, 20) + + # calc skill scores + skill_scores = statistics.SkillScores(None) + score = {} + for station in self.test_data.stations: + file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") + labels = xr.open_dataarray(file_name) + shape = labels.shape + orig = bootstraps.get_orig_prediction(forecast_path, f"forecasts_norm_{station}_test.nc").reshape(shape) + coords = (range(shape[0]), range(1, shape[1] + 1), ["orig"]) + orig = xr.DataArray(orig, coords=coords, dims=["index", "ahead", "type"]) + skill = pd.DataFrame(columns=range(1, window_lead_time + 1)) + for boot in self.test_data.variables: + file_name = os.path.join(forecast_path, f"bootstraps_{boot}_{station}.nc") + boot_data = xr.open_dataarray(file_name) + boot_data = boot_data.combine_first(labels).combine_first(orig) + boot_scores = [] + for ahead in range(1, window_lead_time + 1): + data = boot_data.sel(ahead=ahead) + boot_scores.append(skill_scores.general_skill_score(data, forecast_name=boot, reference_name="orig")) + skill.loc[boot] = np.array(boot_scores) + score[station] = xr.DataArray(skill, dims=["boot_var", "ahead"]) + return score def _load_model(self): try: