From dd11e6eaba1852382b9ff61b91a201a7e82b7a54 Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Tue, 31 Mar 2020 14:52:31 +0200 Subject: [PATCH] removed many unnecessary and duplicated methods --- src/data_handling/bootstraps.py | 368 ++++++++--------------------- src/run_modules/post_processing.py | 21 +- 2 files changed, 112 insertions(+), 277 deletions(-) diff --git a/src/data_handling/bootstraps.py b/src/data_handling/bootstraps.py index 8888f6b1..8fc0e62a 100644 --- a/src/data_handling/bootstraps.py +++ b/src/data_handling/bootstraps.py @@ -14,17 +14,20 @@ from src import helpers from typing import List, Union, Pattern -class RealBootStrapGenerator(keras.utils.Sequence): +class RealDataGenerator(keras.utils.Sequence): + + def __init__(self, data, number_of_boots): + self.data = data + self.number_of_boots = number_of_boots + + def __len__(self): + return self.number_of_boots def __getitem__(self, index): - logging.debug(f"boot: {index}") - boot_hist = self.history.copy() - boot_hist = boot_hist.combine_first(self.__get_shuffled(index)) - return boot_hist.reindex_like(self.history_orig) + return self.data - def __get_shuffled(self, index): - shuffled_var = self.shuffled.sel(boots=index).expand_dims("variables").drop("boots") - return shuffled_var.transpose("datetime", "window", "Stations", "variables") + +class RealBootStrapGenerator(keras.utils.Sequence): def __init__(self, number_of_boots, history, shuffled, variables, shuffled_variable): self.number_of_boots = number_of_boots @@ -36,131 +39,103 @@ class RealBootStrapGenerator(keras.utils.Sequence): def __len__(self): return self.number_of_boots + def __getitem__(self, index): + logging.debug(f"boot: {index}") + boot_hist = self.history.copy() + boot_hist = boot_hist.combine_first(self.__get_shuffled(index)) + return boot_hist.reindex_like(self.history_orig) -class BootStrapGeneratorNew: + def __get_shuffled(self, index): + shuffled_var = self.shuffled.sel(boots=index).expand_dims("variables").drop("boots") + return shuffled_var.transpose("datetime", "window", "Stations", "variables") - def __init__(self, orig_generator, number_of_boots, bootstrap_path): - self.orig_generator: DataGenerator = orig_generator - self.stations = self.orig_generator.stations - self.variables = self.orig_generator.variables - self.number_of_boots = number_of_boots - self.bootstrap_path = bootstrap_path - def __len__(self): - return len(self.orig_generator) * self.number_of_boots +class CreateShuffledData: - def get_generator_station_var_wise(self, station, var): - """ - This is the implementation of the __next__ method of the iterator protocol. Get the data generator, and return - the history and label data of this generator. - :return: - """ - hist, label = self.orig_generator[station] - shuffled_data = self.load_shuffled_data(station, self.variables) - gen = RealBootStrapGenerator(self.number_of_boots, hist, shuffled_data, self.variables, var) - return hist, label, gen, self.number_of_boots - - def get_bootstrap_meta_station_var_wise(self, station, var) -> List: - """ - Create meta data on ordering of variable bootstraps according to ordering from get_generator method. - :return: list with bootstrapped variable first and its corresponding station second. - """ - bootstrap_meta = [] - label = self.orig_generator.get_data_generator(station).get_transposed_label() - for boot in range(self.number_of_boots): - bootstrap_meta.extend([[var, station]] * len(label)) - return bootstrap_meta + def __init__(self, data: DataGenerator, number_of_bootstraps, bootstrap_path): + self.data = data + self.number_of_bootstraps = number_of_bootstraps + self.bootstrap_path = bootstrap_path + self.create_shuffled_data() - def get_labels(self, key: Union[str, int]): + def create_shuffled_data(self): """ - Reepats labels for given key by the number of boots and yield it one by one. - :param key: key of station (either station name as string or the position in generator as integer) - :return: yields labels for length of boots + Create shuffled data. Use original test data, add dimension 'boots' with length number of bootstraps and insert + randomly selected variables. If there is a suitable local file for requested window size and number of + bootstraps, no additional file will be created inside this function. """ - _, label = self.orig_generator[key] - for _ in range(self.number_of_boots): - yield label + logging.info("create / check shuffled bootstrap data") + variables_str = '_'.join(sorted(self.data.variables)) + window = self.data.window_history_size + for station in self.data.stations: + valid, nboot = self.valid_bootstrap_file(station, variables_str, window) + if not valid: + logging.info(f'create bootstap data for {station}') + hist = self.data.get_data_generator(station).get_transposed_history() + file_path = self._set_file_path(station, variables_str, window, nboot) + hist = hist.expand_dims({'boots': range(nboot)}, axis=-1) + shuffled_variable = [] + chunks = (100, *hist.shape[1:3], hist.shape[-1]) + for i, var in enumerate(hist.coords['variables']): + single_variable = hist.sel(variables=var).values + shuffled_variable.append(self.shuffle_single_variable(single_variable, chunks=chunks)) + shuffled_variable_da = da.stack(shuffled_variable, axis=-2).rechunk("auto") + shuffled_data = xr.DataArray(shuffled_variable_da, coords=hist.coords, dims=hist.dims) + shuffled_data.to_netcdf(file_path) - def get_orig_prediction(self, path: str, file_name: str, prediction_name: str = "CNN"): - """ - Repeats predictions from given file(_name) in path by the number of boots. - :param path: path to file - :param file_name: file name - :param prediction_name: name of the prediction to select from loaded file - :return: yields predictions for length of boots - """ - file = os.path.join(path, file_name) - data = xr.open_dataarray(file) - for _ in range(self.number_of_boots): - yield data.sel(type=prediction_name).squeeze() + def _set_file_path(self, station, variables, window, nboots): + file_name = f"{station}_{variables}_hist{window}_nboots{nboots}_shuffled.nc" + return os.path.join(self.bootstrap_path, file_name) - def load_shuffled_data(self, station: str, variables: List[str]) -> xr.DataArray: + def valid_bootstrap_file(self, station, variables, window): """ - Load shuffled data from bootstrap path. Data is stored as - '<station>_<var1>_<var2>_..._hist<histsize>_nboots<nboots>_shuffled.nc', e.g. - 'DEBW107_cloudcover_no_no2_temp_u_v_hist13_nboots20_shuffled.nc' + Compare local bootstrap file with given settings for station, variables, window and number of bootstraps. If a + match was found, this method returns a tuple (True, None). In any other case, it returns (False, max_nboot), + where max_nboot is the highest boot number found in the local storage. A match is defined so that the window + length is ge than given window size form args and the number of boots is also ge than the given number of boots + from this class. Furthermore, this functions deletes local files, if the match the station pattern but don't fit + the window and bootstrap condition. This is performed, because it is assumed, that the corresponding file will + be created with a longer or at the least same window size and numbers of bootstraps. :param station: :param variables: - :return: shuffled data as xarray - """ - file_name = self.get_shuffled_data_file(station, variables) - shuffled_data = xr.open_dataarray(file_name, chunks=100) - return shuffled_data - - def get_shuffled_data_file(self, station, variables): - files = os.listdir(self.bootstrap_path) - regex = self.create_file_regex(station, variables) - file = self.filter_files(regex, files, self.orig_generator.window_history_size, self.number_of_boots) - if file: - return os.path.join(self.bootstrap_path, file) - else: - raise FileNotFoundError(f"Could not find a file to match pattern {regex}") - - @staticmethod - def create_file_regex(station: str, variables: List[str]) -> Pattern: - """ - Creates regex for given station and variables to look for shuffled data with pattern: - `<station>(_<var>)*_hist(<hist>)_nboots(<nboots>)_shuffled.nc` - :param station: station name to use as prefix - :param variables: variables to add after station - :return: compiled regular expression + :param window: + :return: """ - var_regex = "".join([rf"(_\w+)*_{v}(_\w+)*" for v in sorted(variables)]) - regex = re.compile(rf"{station}{var_regex}_hist(\d+)_nboots(\d+)_shuffled\.nc") - return regex + regex = re.compile(rf"{station}_{variables}_hist(\d+)_nboots(\d+)_shuffled") + max_nboot = self.number_of_bootstraps + for file in os.listdir(self.bootstrap_path): + match = regex.match(file) + if match: + window_file = int(match.group(1)) + nboot_file = int(match.group(2)) + max_nboot = max([max_nboot, nboot_file]) + if (window_file >= window) and (nboot_file >= self.number_of_bootstraps): + return True, None + else: + os.remove(os.path.join(self.bootstrap_path, file)) + return False, max_nboot @staticmethod - def filter_files(regex: Pattern, files: List[str], window: int, nboot: int) -> Union[str, None]: - """ - Filter list of files by regex. Regex has to be structured to match the following string structure - `<station>(_<var>)*_hist(<hist>)_nboots(<nboots>)_shuffled.nc`. Hist and nboots values have to be included as - group. All matches are compared to given window and nboot parameters. A valid file must have the same value (or - larger) than these parameters and contain all variables. - :param regex: compiled regular expression pattern following the style from method description - :param files: list of file names to filter - :param window: minimum length of window to look for - :param nboot: minimal number of boots to search - :return: matching file name or None, if no valid file was found - """ - for f in files: - match = regex.match(f) - if match: - last = match.lastindex - if (int(match.group(last-1)) >= window) and (int(match.group(last)) >= nboot): - return f + def shuffle_single_variable(data: da.array, chunks) -> da.core.Array: + size = data.shape + return da.random.choice(data.reshape(-1,), size=size, chunks=chunks) -class BootStrapGenerator: +class BootStraps: - def __init__(self, orig_generator, number_of_boots, bootstrap_path): - self.orig_generator: DataGenerator = orig_generator - self.stations = self.orig_generator.stations - self.variables = self.orig_generator.variables - self.number_of_boots = number_of_boots + def __init__(self, data, bootstrap_path, number_of_bootstraps=10): + self.data: DataGenerator = data + self.number_of_bootstraps = number_of_bootstraps self.bootstrap_path = bootstrap_path + CreateShuffledData(data, number_of_bootstraps, bootstrap_path) - def __len__(self): - return len(self.orig_generator) * self.number_of_boots + @property + def stations(self): + return self.data.stations + + @property + def variables(self): + return self.data.variables def get_generator_station_var_wise(self, station, var): """ @@ -168,33 +143,9 @@ class BootStrapGenerator: the history and label data of this generator. :return: """ - hist, label = self.orig_generator[station] - shuffled_data = self.load_shuffled_data(station, self.variables) - - def f(): - while True: - for boot in range(self.number_of_boots): - logging.debug(f"boot: {boot}") - boot_hist = hist.sel(variables=helpers.list_pop(self.variables, var)) - shuffled_var = shuffled_data.sel(variables=var, boots=boot).expand_dims("variables").drop("boots") - shuffled_var = shuffled_var.transpose("datetime", "window", "Stations", "variables") - boot_hist = boot_hist.combine_first(shuffled_var) - boot_hist = boot_hist.reindex_like(hist) - yield boot_hist - return - - return hist, label, f, self.number_of_boots - - def get_bootstrap_meta_station_var_wise(self, station, var) -> List: - """ - Create meta data on ordering of variable bootstraps according to ordering from get_generator method. - :return: list with bootstrapped variable first and its corresponding station second. - """ - bootstrap_meta = [] - label = self.orig_generator.get_data_generator(station).get_transposed_label() - for boot in range(self.number_of_boots): - bootstrap_meta.extend([[var, station]] * len(label)) - return bootstrap_meta + hist, _ = self.data[station] + shuffled_data = self._load_shuffled_data(station, self.variables) + return RealBootStrapGenerator(self.number_of_bootstraps, hist, shuffled_data, self.variables, var) def get_labels(self, key: Union[str, int]): """ @@ -202,9 +153,8 @@ class BootStrapGenerator: :param key: key of station (either station name as string or the position in generator as integer) :return: yields labels for length of boots """ - _, label = self.orig_generator[key] - for _ in range(self.number_of_boots): - yield label + labels = self.data[key][1] + return labels.data.repeat(self.number_of_bootstraps, axis=0) def get_orig_prediction(self, path: str, file_name: str, prediction_name: str = "CNN"): """ @@ -215,11 +165,11 @@ class BootStrapGenerator: :return: yields predictions for length of boots """ file = os.path.join(path, file_name) - data = xr.open_dataarray(file) - for _ in range(self.number_of_boots): - yield data.sel(type=prediction_name).squeeze() + prediction = xr.open_dataarray(file).sel(type=prediction_name).squeeze() + vals = prediction.data.repeat(self.number_of_bootstraps, axis=0) + return vals[~np.isnan(vals).any(axis=1), :] - def load_shuffled_data(self, station: str, variables: List[str]) -> xr.DataArray: + def _load_shuffled_data(self, station: str, variables: List[str]) -> xr.DataArray: """ Load shuffled data from bootstrap path. Data is stored as '<station>_<var1>_<var2>_..._hist<histsize>_nboots<nboots>_shuffled.nc', e.g. @@ -228,21 +178,21 @@ class BootStrapGenerator: :param variables: :return: shuffled data as xarray """ - file_name = self.get_shuffled_data_file(station, variables) + file_name = self._get_shuffled_data_file(station, variables) shuffled_data = xr.open_dataarray(file_name, chunks=100) return shuffled_data - def get_shuffled_data_file(self, station, variables): + def _get_shuffled_data_file(self, station, variables): files = os.listdir(self.bootstrap_path) - regex = self.create_file_regex(station, variables) - file = self.filter_files(regex, files, self.orig_generator.window_history_size, self.number_of_boots) + regex = self._create_file_regex(station, variables) + file = self._filter_files(regex, files, self.data.window_history_size, self.number_of_bootstraps) if file: return os.path.join(self.bootstrap_path, file) else: raise FileNotFoundError(f"Could not find a file to match pattern {regex}") @staticmethod - def create_file_regex(station: str, variables: List[str]) -> Pattern: + def _create_file_regex(station: str, variables: List[str]) -> Pattern: """ Creates regex for given station and variables to look for shuffled data with pattern: `<station>(_<var>)*_hist(<hist>)_nboots(<nboots>)_shuffled.nc` @@ -255,7 +205,7 @@ class BootStrapGenerator: return regex @staticmethod - def filter_files(regex: Pattern, files: List[str], window: int, nboot: int) -> Union[str, None]: + def _filter_files(regex: Pattern, files: List[str], window: int, nboot: int) -> Union[str, None]: """ Filter list of files by regex. Regex has to be structured to match the following string structure `<station>(_<var>)*_hist(<hist>)_nboots(<nboots>)_shuffled.nc`. Hist and nboots values have to be included as @@ -275,118 +225,6 @@ class BootStrapGenerator: return f -class BootStraps: - - def __init__(self, data, bootstrap_path, number_bootstraps=10): - self.data: DataGenerator = data - self.number_bootstraps = number_bootstraps - self.bootstrap_path = bootstrap_path - self.chunks = self.get_chunk_size() - self.create_shuffled_data() - self._boot_strap_generator = BootStrapGeneratorNew(self.data, self.number_bootstraps, self.bootstrap_path) - - @property - def stations(self): - return self._boot_strap_generator.stations - - @property - def variables(self): - return self._boot_strap_generator.variables - - def get_generator_station_var_wise(self, station, var): - return self._boot_strap_generator.get_generator_station_var_wise(station, var) - - def get_bootstrap_meta_station_var_wise(self, station, var): - return self._boot_strap_generator.get_bootstrap_meta_station_var_wise(station, var) - - def get_boot_strap_generator_length(self): - return self._boot_strap_generator.__len__() - - def get_labels(self, key): - labels_list = [] - chunks = None - for labels in self._boot_strap_generator.get_labels(key): - if len(labels_list) == 0: - chunks = (100, labels.data.shape[1]) - labels_list.append(da.from_array(labels.data, chunks=chunks)) - labels_out = da.concatenate(labels_list, axis=0) - return labels_out.compute() - - def get_orig_prediction(self, path, name): - labels_list = [] - chunks = None - for labels in self._boot_strap_generator.get_orig_prediction(path, name): - if len(labels_list) == 0: - chunks = (100, labels.data.shape[1]) - labels_list.append(da.from_array(labels.data, chunks=chunks)) - labels_out = da.concatenate(labels_list, axis=0) - labels_out = labels_out.compute() - return labels_out[~np.isnan(labels_out).any(axis=1), :] - - def get_chunk_size(self): - hist, _ = self.data[0] - return (100, *hist.shape[1:], self.number_bootstraps) - - def create_shuffled_data(self): - """ - Create shuffled data. Use original test data, add dimension 'boots' with length number of bootstraps and insert - randomly selected variables. If there is a suitable local file for requested window size and number of - bootstraps, no additional file will be created inside this function. - """ - logging.info("create / check shuffled bootstrap data") - variables_str = '_'.join(sorted(self.data.variables)) - window = self.data.window_history_size - for station in self.data.stations: - valid, nboot = self.valid_bootstrap_file(station, variables_str, window) - if not valid: - logging.info(f'create bootstap data for {station}') - hist, _ = self.data[station] - data = hist.copy() - file_name = f"{station}_{variables_str}_hist{window}_nboots{nboot}_shuffled.nc" - file_path = os.path.join(self.bootstrap_path, file_name) - data = data.expand_dims({'boots': range(nboot)}, axis=-1) - shuffled_variable = [] - for i, var in enumerate(data.coords['variables']): - single_variable = data.sel(variables=var).values - shuffled_variable.append(self.shuffle_single_variable(single_variable, chunks=(100, *data.shape[1:3], data.shape[-1]))) - shuffled_variable_da = da.stack(shuffled_variable, axis=-2, ).rechunk("auto") - shuffled_data = xr.DataArray(shuffled_variable_da, coords=data.coords, dims=data.dims) - shuffled_data.to_netcdf(file_path) - - def valid_bootstrap_file(self, station, variables, window): - """ - Compare local bootstrap file with given settings for station, variables, window and number of bootstraps. If a - match was found, this method returns a tuple (True, None). In any other case, it returns (False, max_nboot), - where max_nboot is the highest boot number found in the local storage. A match is defined so that the window - length is ge than given window size form args and the number of boots is also ge than the given number of boots - from this class. Furthermore, this functions deletes local files, if the match the station pattern but don't fit - the window and bootstrap condition. This is performed, because it is assumed, that the corresponding file will - be created with a longer or at the least same window size and numbers of bootstraps. - :param station: - :param variables: - :param window: - :return: - """ - regex = re.compile(rf"{station}_{variables}_hist(\d+)_nboots(\d+)_shuffled") - max_nboot = self.number_bootstraps - for file in os.listdir(self.bootstrap_path): - match = regex.match(file) - if match: - window_file = int(match.group(1)) - nboot_file = int(match.group(2)) - max_nboot = max([max_nboot, nboot_file]) - if (window_file >= window) and (nboot_file >= self.number_bootstraps): - return True, None - else: - os.remove(os.path.join(self.bootstrap_path, file)) - return False, max_nboot - - @staticmethod - def shuffle_single_variable(data: da.array, chunks) -> da.core.Array: - size = data.shape - return da.random.choice(data.reshape(-1,), size=size, chunks=chunks) - - if __name__ == "__main__": from src.run_modules.experiment_setup import ExperimentSetup diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index bb37b9c2..60d0c80d 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -104,34 +104,31 @@ class PostProcessing(RunEnvironment): bootstraps = BootStraps(self.test_data, bootstrap_path, number_of_bootstraps) # create bootstrapped predictions for all stations and variables and save it to disk + dims = ["index", "ahead", "type"] for station in bootstraps.stations: with TimeTracking(name=station): logging.info(station) for var in bootstraps.variables: - hist, label, station_bootstrap, length = bootstraps.get_generator_station_var_wise(station, var) + station_bootstrap = bootstraps.get_generator_station_var_wise(station, var) # make bootstrap predictions bootstrap_predictions = self.model.predict_generator(generator=station_bootstrap, - steps=length, - workers=4, + workers=2, use_multiprocessing=True) if isinstance(bootstrap_predictions, list): # if model is branched model bootstrap_predictions = bootstrap_predictions[-1] - # get bootstrap prediction meta data - bootstrap_meta = np.array(bootstraps.get_bootstrap_meta_station_var_wise(station, var)) # save bootstrap predictions separately for each station and variable combination # store each variable - station - combination - ind = np.all(bootstrap_meta == [var, station], axis=1) - length = sum(ind) - sel = bootstrap_predictions[ind].reshape((length, window_lead_time, 1)) - coords = (range(length), range(1, window_lead_time + 1)) - tmp = xr.DataArray(sel, coords=(*coords, [var]), dims=["index", "ahead", "type"]) + bootstrap_predictions = np.expand_dims(bootstrap_predictions, axis=-1) + shape = bootstrap_predictions.shape + coords = (range(shape[0]), range(1, shape[1] + 1)) + tmp = xr.DataArray(bootstrap_predictions, coords=(*coords, [var]), dims=dims) file_name = os.path.join(forecast_path, f"bootstraps_{var}_{station}.nc") tmp.to_netcdf(file_name) # store also true labels for each station - labels = bootstraps.get_labels(station).reshape((length, window_lead_time, 1)) + labels = np.expand_dims(bootstraps.get_labels(station), axis=-1) file_name = os.path.join(forecast_path, f"bootstraps_labels_{station}.nc") - labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=["index", "ahead", "type"]) + labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=dims) labels.to_netcdf(file_name) def calculate_bootstrap_skill_scores(self) -> Dict[str, xr.DataArray]: -- GitLab