diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index 088a504ab623198f0cc37815717a7ce291dda461..d61146b61a5ade9118675fa7b895212f310acc71 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -46,6 +46,8 @@ DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS = True DEFAULT_EVALUATE_BOOTSTRAPS = True DEFAULT_CREATE_NEW_BOOTSTRAPS = False DEFAULT_NUMBER_OF_BOOTSTRAPS = 20 +DEFAULT_BOOTSTRAP_TYPE = "singleinput" +DEFAULT_BOOTSTRAP_METHOD = "shuffle" DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotBootstrapSkillScore", "PlotConditionalQuantiles", "PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram", "PlotPeriodogram"] diff --git a/mlair/data_handler/bootstraps.py b/mlair/data_handler/bootstraps.py index 68a4bbc4bc9620bfb54ba23fef1ce882e76c8626..e03881484bfc9b8275ede8a4432072c74643994a 100644 --- a/mlair/data_handler/bootstraps.py +++ b/mlair/data_handler/bootstraps.py @@ -15,69 +15,175 @@ __date__ = '2020-02-07' import os from collections import Iterator, Iterable from itertools import chain +from typing import Union, List import numpy as np import xarray as xr from mlair.data_handler.abstract_data_handler import AbstractDataHandler +from mlair.helpers.helpers import to_list class BootstrapIterator(Iterator): _position: int = None - def __init__(self, data: "BootStraps"): + def __init__(self, data: "BootStraps", method): assert isinstance(data, BootStraps) self._data = data self._dimension = data.bootstrap_dimension - self._collection = self._data.bootstraps() + self.boot_dim = "boots" + self._method = method + self._collection = self.create_collection(self._data.data, self._dimension) self._position = 0 + def __next__(self): + """Return next element or stop iteration.""" + raise NotImplementedError + + @classmethod + def create_collection(cls, data, dim): + raise NotImplementedError + + def _reshape(self, d): + if isinstance(d, list): + return list(map(lambda x: self._reshape(x), d)) + # return list(map(lambda x: np.rollaxis(x, -1, 0).reshape(x.shape[0] * x.shape[-1], *x.shape[1:-1]), d)) + else: + shape = d.shape + return np.rollaxis(d, -1, 0).reshape(shape[0] * shape[-1], *shape[1:-1]) + + def _to_numpy(self, d): + if isinstance(d, list): + return list(map(lambda x: self._to_numpy(x), d)) + else: + return d.values + + def apply_bootstrap_method(self, data: np.ndarray) -> Union[np.ndarray, List[np.ndarray]]: + """ + Apply predefined bootstrap method from given data. + + :param data: data to apply bootstrap method on + :return: processed data as numpy array + """ + if isinstance(data, list): + return list(map(lambda x: self.apply_bootstrap_method(x.values), data)) + else: + return self._method.apply(data) + + +class BootstrapIteratorSingleInput(BootstrapIterator): + _position: int = None + + def __init__(self, *args): + super().__init__(*args) + def __next__(self): """Return next element or stop iteration.""" try: index, dimension = self._collection[self._position] nboot = self._data.number_of_bootstraps _X, _Y = self._data.data.get_data(as_numpy=False) - _X = list(map(lambda x: x.expand_dims({'boots': range(nboot)}, axis=-1), _X)) - _Y = _Y.expand_dims({"boots": range(nboot)}, axis=-1) + _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) + _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) single_variable = _X[index].sel({self._dimension: [dimension]}) - shuffled_variable = self.shuffle(single_variable.values) - shuffled_data = xr.DataArray(shuffled_variable, coords=single_variable.coords, dims=single_variable.dims) - _X[index] = shuffled_data.combine_first(_X[index]).reindex_like(_X[index]) + bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) + bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, + dims=single_variable.dims) + _X[index] = bootstrapped_data.combine_first(_X[index]).reindex_like(_X[index]) self._position += 1 except IndexError: raise StopIteration() _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) return self._reshape(_X), self._reshape(_Y), (index, dimension) - @staticmethod - def _reshape(d): - if isinstance(d, list): - return list(map(lambda x: np.rollaxis(x, -1, 0).reshape(x.shape[0] * x.shape[-1], *x.shape[1:-1]), d)) - else: - shape = d.shape - return np.rollaxis(d, -1, 0).reshape(shape[0] * shape[-1], *shape[1:-1]) + @classmethod + def create_collection(cls, data, dim): + l = [] + for i, x in enumerate(data.get_X(as_numpy=False)): + l.append(list(map(lambda y: (i, y), x.indexes[dim]))) + return list(chain(*l)) - @staticmethod - def _to_numpy(d): - if isinstance(d, list): - return list(map(lambda x: x.values, d)) - else: - return d.values - @staticmethod - def shuffle(data: np.ndarray) -> np.ndarray: - """ - Shuffle randomly from given data (draw elements with replacement). +class BootstrapIteratorVariable(BootstrapIterator): - :param data: data to shuffle - :return: shuffled data as numpy array - """ + def __init__(self, *args): + super().__init__(*args) + + def __next__(self): + """Return next element or stop iteration.""" + try: + dimension = self._collection[self._position] + nboot = self._data.number_of_bootstraps + _X, _Y = self._data.data.get_data(as_numpy=False) + _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) + _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) + for index in range(len(_X)): + single_variable = _X[index].sel({self._dimension: [dimension]}) + bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) + bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, + dims=single_variable.dims) + _X[index] = bootstrapped_data.combine_first(_X[index]).transpose(*_X[index].dims) + self._position += 1 + except IndexError: + raise StopIteration() + _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) + return self._reshape(_X), self._reshape(_Y), (None, dimension) + + @classmethod + def create_collection(cls, data, dim): + l = set() + for i, x in enumerate(data.get_X(as_numpy=False)): + l.update(x.indexes[dim].to_list()) + return to_list(l) + + +class BootstrapIteratorBranch(BootstrapIterator): + + def __init__(self, *args): + super().__init__(*args) + + def __next__(self): + try: + index = self._collection[self._position] + nboot = self._data.number_of_bootstraps + _X, _Y = self._data.data.get_data(as_numpy=False) + _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) + _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) + for dimension in _X[index].coords[self._dimension].values: + single_variable = _X[index].sel({self._dimension: [dimension]}) + bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) + bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, + dims=single_variable.dims) + _X[index] = bootstrapped_data.combine_first(_X[index]).transpose(*_X[index].dims) + self._position += 1 + except IndexError: + raise StopIteration() + _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) + return self._reshape(_X), self._reshape(_Y), (None, index) + + @classmethod + def create_collection(cls, data, dim): + return list(range(len(data.get_X(as_numpy=False)))) + + +class ShuffleBootstraps: + + @staticmethod + def apply(data): size = data.shape return np.random.choice(data.reshape(-1, ), size=size) +class MeanBootstraps: + + def __init__(self, mean): + self._mean = mean + + def apply(self, data): + return np.ones_like(data) * self._mean + + class BootStraps(Iterable): """ Main class to perform bootstrap operations. @@ -89,10 +195,19 @@ class BootStraps(Iterable): this variable). The tuple is interesting if X consists on mutliple input streams X_i (e.g. two or more stations) because it shows which variable of which input X_i has been bootstrapped. All bootstrap combinations can be retrieved by calling the .bootstraps() method. Further more, by calling the .get_orig_prediction() this class - imitates according to the set number of bootstraps the original prediction + imitates according to the set number of bootstraps the original prediction. + + As bootstrap method, this class can currently make use of the ShuffleBoostraps class that uses drawing with + replacement to destroy the variables information by keeping its statistical properties. Use `bootstrap="shuffle"` to + call this method. Another method is the zero mean bootstrapping triggered by `bootstrap="zero_mean"` and performed + by the MeanBootstraps class. This method destroy the variable's information by a mode collapse to constant value of + zero. In case, the variable is normalized with a zero mean, this is equivalent to a mode collapse to the variable's + mean value. Statistics in general are not conserved in this case, but the mean value of course. A custom mean value + for bootstrapping is currently not supported. """ + def __init__(self, data: AbstractDataHandler, number_of_bootstraps: int = 10, - bootstrap_dimension: str = "variables"): + bootstrap_dimension: str = "variables", bootstrap_type="singleinput", bootstrap_method="shuffle"): """ Create iterable class to be ready to iter. @@ -100,20 +215,24 @@ class BootStraps(Iterable): :param number_of_bootstraps: the number of bootstrap realisations """ self.data = data - self.number_of_bootstraps = number_of_bootstraps + self.number_of_bootstraps = number_of_bootstraps if bootstrap_method == "shuffle" else 1 self.bootstrap_dimension = bootstrap_dimension + self.bootstrap_method = {"shuffle": ShuffleBootstraps(), + "zero_mean": MeanBootstraps(mean=0)}.get( + bootstrap_method) # todo adjust number of bootstraps if mean bootstrapping + self.BootstrapIterator = {"singleinput": BootstrapIteratorSingleInput, + "branch": BootstrapIteratorBranch, + "variable": BootstrapIteratorVariable}.get(bootstrap_type, + BootstrapIteratorSingleInput) def __iter__(self): - return BootstrapIterator(self) + return self.BootstrapIterator(self, self.bootstrap_method) def __len__(self): - return len(self.bootstraps()) + return len(self.BootstrapIterator.create_collection(self.data, self.bootstrap_dimension)) def bootstraps(self): - l = [] - for i, x in enumerate(self.data.get_X(as_numpy=False)): - l.append(list(map(lambda y: (i, y), x.indexes['variables']))) - return list(chain(*l)) + return self.BootstrapIterator.create_collection(self.data, self.bootstrap_dimension) def get_orig_prediction(self, path: str, file_name: str, prediction_name: str = "CNN") -> np.ndarray: """ diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 80253b0a5330926fb123bf032772f5d063267213..e76f396aea80b2db76e01ea5baacf71d024b0d23 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -61,6 +61,8 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation): for k, v in transformation[0].items(): if v["method"] == "log": transformation[0][k]["method"] = "standardise" + elif v["method"] == "min_max": + transformation[0][k]["method"] = "standardise" return transformation def _check_sampling(self, **kwargs): diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index a551bec4220b01012c6d6962e6dfee135fbe5d6a..a63cef975888162f335e4528c2f99bdfc7a892d5 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -408,7 +408,8 @@ class ClimateFIRFilter: continue logging.debug(f"{data.coords['Stations'].values[0]} ({var}): start filter convolve") - with TimeTracking(name=f"{data.coords['Stations'].values[0]} ({var}): filter convolve"): + with TimeTracking(name=f"{data.coords['Stations'].values[0]} ({var}): filter convolve", + logging_level=logging.DEBUG): filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data, input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index d5499294227cae1f49797caafe354bef183a94b3..a1e713a8c135800d02ff7c27894485a5da7fae37 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -257,11 +257,12 @@ class SkillScores: """ models_default = ["cnn", "persi", "ols"] - def __init__(self, external_data: Data, models=None, observation_name="obs"): + def __init__(self, external_data: Union[Data, None], models=None, observation_name="obs", ahead_dim="ahead"): """Set internal data.""" self.external_data = external_data self.models = self.set_model_names(models) self.observation_name = observation_name + self.ahead_dim = ahead_dim def set_model_names(self, models: List[str]) -> List[str]: """Either use given models or use defaults.""" @@ -283,19 +284,17 @@ class SkillScores: combination_strings = [f"{first}-{second}" for (first, second) in combinations] return combinations, combination_strings - def skill_scores(self, window_lead_time: int, ahead_dim="ahead") -> pd.DataFrame: + def skill_scores(self) -> pd.DataFrame: """ Calculate skill scores for all combinations of model names. - :param window_lead_time: length of forecast steps - :return: skill score for each comparison and forecast step """ - ahead_names = list(self.external_data[ahead_dim].data) + ahead_names = list(self.external_data[self.ahead_dim].data) combinations, combination_strings = self.get_model_name_combinations() skill_score = pd.DataFrame(index=combination_strings) for iahead in ahead_names: - data = self.external_data.sel(ahead=iahead) + data = self.external_data.sel({self.ahead_dim: iahead}) skill_score[iahead] = [self.general_skill_score(data, forecast_name=first, reference_name=second, @@ -303,8 +302,7 @@ class SkillScores: for (first, second) in combinations] return skill_score - def climatological_skill_scores(self, internal_data: Data, window_lead_time: int, - forecast_name: str, ahead_dim="ahead") -> xr.DataArray: + def climatological_skill_scores(self, internal_data: Data, forecast_name: str) -> xr.DataArray: """ Calculate climatological skill scores according to Murphy (1988). @@ -312,20 +310,19 @@ class SkillScores: is part of parameters. :param internal_data: internal data - :param window_lead_time: interested time step of forecast horizon to select data :param forecast_name: name of the forecast to use for this calculation (must be available in `data`) :return: all CASES as well as all terms """ - ahead_names = list(self.external_data[ahead_dim].data) + ahead_names = list(self.external_data[self.ahead_dim].data) all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III', 'CASE IV'] skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms, ahead_names], - dims=['terms', 'ahead']) + dims=['terms', self.ahead_dim]) for iahead in ahead_names: - data = internal_data.sel(ahead=iahead) + data = internal_data.sel({self.ahead_dim: iahead}) skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=1, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) @@ -334,7 +331,7 @@ class SkillScores: data, mu_type=2, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) if self.external_data is not None and self.observation_name in self.external_data.coords["type"]: - external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) + external_data = self.external_data.sel({self.ahead_dim: iahead, "type": [self.observation_name]}) skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, external_data=external_data).values.flatten()) @@ -373,12 +370,12 @@ class SkillScores: skill_score = 1 - mse(observation, forecast) / mse(observation, reference) return skill_score.values - @staticmethod - def skill_score_pre_calculations(data: Data, observation_name: str, forecast_name: str) -> Tuple[np.ndarray, - np.ndarray, - np.ndarray, - Data, - Dict[str, Data]]: + def skill_score_pre_calculations(self, data: Data, observation_name: str, forecast_name: str) -> Tuple[np.ndarray, + np.ndarray, + np.ndarray, + Data, + Dict[ + str, Data]]: """ Calculate terms AI, BI, and CI, mean, variance and pearson's correlation and clean up data. @@ -391,7 +388,7 @@ class SkillScores: :returns: Terms AI, BI, and CI, internal data without nans and mean, variance, correlation and its p-value """ - data = data.sel(type=[observation_name, forecast_name]).drop("ahead") + data = data.sel(type=[observation_name, forecast_name]).drop(self.ahead_dim) data = data.dropna("index") mean = data.mean("index") diff --git a/mlair/helpers/time_tracking.py b/mlair/helpers/time_tracking.py index c85a6a047943a589a9d076584ae40186634db767..3105ebcd04406b7d449ba312bd3af46f83e3a716 100644 --- a/mlair/helpers/time_tracking.py +++ b/mlair/helpers/time_tracking.py @@ -68,11 +68,12 @@ class TimeTracking(object): The only disadvantage of the latter implementation is, that the duration is logged but not returned. """ - def __init__(self, start=True, name="undefined job"): + def __init__(self, start=True, name="undefined job", logging_level=logging.INFO): """Construct time tracking and start if enabled.""" self.start = None self.end = None self._name = name + self._logging = {logging.INFO: logging.info, logging.DEBUG: logging.debug}.get(logging_level, logging.info) if start: self._start() @@ -128,4 +129,4 @@ class TimeTracking(object): def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Stop time tracking on exit and log info about passed time.""" self.stop() - logging.info(f"{self._name} finished after {self}") \ No newline at end of file + self._logging(f"{self._name} finished after {self}") diff --git a/mlair/model_modules/fully_connected_networks.py b/mlair/model_modules/fully_connected_networks.py index 2145538366711728ac1f5b8f748a63a4b198e2eb..0338033315d294c2e54de8b038bba2123d2fee77 100644 --- a/mlair/model_modules/fully_connected_networks.py +++ b/mlair/model_modules/fully_connected_networks.py @@ -374,7 +374,7 @@ class BranchedInputFCN(AbstractModelClass): print(self.model.summary()) def set_compile_options(self): - # self.compile_options = {"loss": [keras.losses.mean_squared_error], - # "metrics": ["mse", "mae", var_loss]} - self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])], + self.compile_options = {"loss": [keras.losses.mean_squared_error], "metrics": ["mse", "mae", var_loss]} + # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])], + # "metrics": ["mse", "mae", var_loss]} diff --git a/mlair/plotting/postprocessing_plotting.py b/mlair/plotting/postprocessing_plotting.py index 491aa52e0a9fe0010f77cde315d1f9b7ddb76dfb..fe658b093b54a5321774fd6d5c613def994aa61c 100644 --- a/mlair/plotting/postprocessing_plotting.py +++ b/mlair/plotting/postprocessing_plotting.py @@ -608,7 +608,9 @@ class PlotBootstrapSkillScore(AbstractPlotClass): """ - def __init__(self, data: Dict, plot_folder: str = ".", model_setup: str = "", separate_vars: List = None): + def __init__(self, data: Dict, plot_folder: str = ".", model_setup: str = "", separate_vars: List = None, + sampling: str = "daily", ahead_dim: str = "ahead", bootstrap_type: str = None, + bootstrap_method: str = None): """ Set attributes and create plot. @@ -616,20 +618,46 @@ class PlotBootstrapSkillScore(AbstractPlotClass): :param plot_folder: path to save the plot (default: current directory) :param model_setup: architecture type to specify plot name (default "CNN") :param separate_vars: variables to plot separated (default: ['o3']) + :param sampling: type of sampling rate, should be either hourly or daily (default: "daily") + :param ahead_dim: name of the ahead dimensions (default: "ahead") + :param bootstrap_annotation: additional information to use in the file name (default: None) """ - super().__init__(plot_folder, f"skill_score_bootstrap_{model_setup}") + annotation = ["_".join([s for s in ["", bootstrap_type, bootstrap_method] if s is not None])][0] + super().__init__(plot_folder, f"skill_score_bootstrap_{model_setup}{annotation}") if separate_vars is None: separate_vars = ['o3'] self._labels = None self._x_name = "boot_var" - self._data = self._prepare_data(data) - self._plot() - self._save() - self.plot_name += '_separated' - self._plot(separate_vars=separate_vars) - self._save(bbox_inches='tight') + self._ahead_dim = ahead_dim + self._boot_type = self._set_bootstrap_type(bootstrap_type) + self._boot_method = self._set_bootstrap_method(bootstrap_method) + + self._title = f"Bootstrap analysis ({self._boot_method}, {self._boot_type})" + self._data = self._prepare_data(data, sampling) + if "branch" in self._data.columns: + plot_name = self.plot_name + for branch in self._data["branch"].unique(): + self._title = f"Bootstrap analysis ({self._boot_method}, {self._boot_type}, {branch})" + self._plot(branch=branch) + self.plot_name = f"{plot_name}_{branch}" + self._save() + else: + self._plot() + self._save() + if len(set(separate_vars).intersection(self._data[self._x_name].unique())) > 0: + self.plot_name += '_separated' + self._plot(separate_vars=separate_vars) + self._save(bbox_inches='tight') + + @staticmethod + def _set_bootstrap_type(boot_type): + return {"singleinput": "single input"}.get(boot_type, boot_type) + + @staticmethod + def _set_bootstrap_method(boot_method): + return {"zero_mean": "zero mean", "shuffle": "shuffled"}.get(boot_method, boot_method) - def _prepare_data(self, data: Dict) -> pd.DataFrame: + def _prepare_data(self, data: Dict, sampling: str) -> pd.DataFrame: """ Shrink given data, if only scores are relevant. @@ -639,23 +667,53 @@ class PlotBootstrapSkillScore(AbstractPlotClass): :param data: dictionary with station names as keys and 2D xarrays as values :return: pre-processed data set """ - data = helpers.dict_to_xarray(data, "station").sortby(self._x_name) - new_boot_coords = self._return_vars_without_number_tag(data.coords['boot_var'].values, split_by='_', keep=1) - data = data.assign_coords({'boot_var': new_boot_coords}) - self._labels = [str(i) + "d" for i in data.coords["ahead"].values] - if "station" not in data.dims: - data = data.expand_dims("station") - return data.to_dataframe("data").reset_index(level=[0, 1, 2]) + station_dim = "station" + data = helpers.dict_to_xarray(data, station_dim).sortby(self._x_name) + if self._boot_type == "single input": + number_tags = self._get_number_tag(data.coords[self._x_name].values, split_by='_') + new_boot_coords = self._return_vars_without_number_tag(data.coords[self._x_name].values, split_by='_', + keep=1, as_unique=True) + values = data.values.reshape((data.shape[0], len(new_boot_coords), len(number_tags), data.shape[-1])) + data = xr.DataArray(values, coords={station_dim: data.coords["station"], self._x_name: new_boot_coords, + "branch": number_tags, self._ahead_dim: data.coords[self._ahead_dim]}, + dims=[station_dim, self._x_name, "branch", self._ahead_dim]) + else: + try: + new_boot_coords = self._return_vars_without_number_tag(data.coords[self._x_name].values, split_by='_', + keep=1) + data = data.assign_coords({self._x_name: new_boot_coords}) + except NotImplementedError: + pass + _, sampling_letter = self._get_target_sampling(sampling, 1) + self._labels = [str(i) + sampling_letter for i in data.coords[self._ahead_dim].values] + if station_dim not in data.dims: + data = data.expand_dims(station_dim) + return data.to_dataframe("data").reset_index(level=np.arange(len(data.dims)).tolist()) + + @staticmethod + def _get_target_sampling(sampling, pos): + sampling = (sampling, sampling) if isinstance(sampling, str) else sampling + sampling_letter = {"hourly": "H", "daily": "d"}.get(sampling[pos], "") + return sampling, sampling_letter - def _return_vars_without_number_tag(self, values, split_by, keep): + def _return_vars_without_number_tag(self, values, split_by, keep, as_unique=False): arr = np.array([v.split(split_by) for v in values]) num = arr[:, 0] + if arr.shape[keep] == 1: # keep dim has only length 1, no number tags required + return num new_val = arr[:, keep] if self._all_values_are_equal(num, axis=0): return new_val + elif as_unique is True: + return np.unique(new_val) else: raise NotImplementedError + @staticmethod + def _get_number_tag(values, split_by): + arr = np.array([v.split(split_by) for v in values]) + num = arr[:, 0] + return np.unique(num).tolist() @staticmethod def _all_values_are_equal(arr, axis=0): @@ -673,45 +731,36 @@ class PlotBootstrapSkillScore(AbstractPlotClass): """ return "" if score_only else "terms and " - def _plot(self, separate_vars=None): + def _plot(self, branch=None, separate_vars=None): """Plot climatological skill score.""" if separate_vars is None: - self._plot_all_variables() + self._plot_all_variables(branch) else: self._plot_selected_variables(separate_vars) def _plot_selected_variables(self, separate_vars: List): - # if separate_vars is None: - # separate_vars = ['o3'] data = self._data - self.raise_error_if_separate_vars_do_not_exist(data, separate_vars) - all_variables = self._get_unique_values_from_column_of_df(data, 'boot_var') - # remaining_vars = helpers.list_pop(all_variables, separate_vars) #remove_items + self.raise_error_if_separate_vars_do_not_exist(data, separate_vars, self._x_name) + all_variables = self._get_unique_values_from_column_of_df(data, self._x_name) remaining_vars = helpers.remove_items(all_variables, separate_vars) - data_first = self._select_data(df=data, variables=separate_vars, column_name='boot_var') - data_second = self._select_data(df=data, variables=remaining_vars, column_name='boot_var') - - fig, ax = plt.subplots(nrows=1, ncols=2, - gridspec_kw={'width_ratios': [len(separate_vars), - len(remaining_vars) - ] - } - ) + data_first = self._select_data(df=data, variables=separate_vars, column_name=self._x_name) + data_second = self._select_data(df=data, variables=remaining_vars, column_name=self._x_name) + + fig, ax = plt.subplots(nrows=1, ncols=2, gridspec_kw={'width_ratios': [len(separate_vars), + len(remaining_vars)]}) if len(separate_vars) > 1: first_box_width = .8 else: first_box_width = 2. - sns.boxplot(x=self._x_name, y="data", hue="ahead", data=data_first, ax=ax[0], whis=1., palette="Blues_d", - showmeans=True, meanprops={"markersize": 1, "markeredgecolor": "k"}, - flierprops={"marker": "."}, width=first_box_width - ) + sns.boxplot(x=self._x_name, y="data", hue=self._ahead_dim, data=data_first, ax=ax[0], whis=1., + palette="Blues_d", showmeans=True, meanprops={"markersize": 1, "markeredgecolor": "k"}, + flierprops={"marker": "."}, width=first_box_width) ax[0].set(ylabel=f"skill score", xlabel="") - sns.boxplot(x=self._x_name, y="data", hue="ahead", data=data_second, ax=ax[1], whis=1., palette="Blues_d", - showmeans=True, meanprops={"markersize": 1, "markeredgecolor": "k"}, - flierprops={"marker": "."}, - ) + sns.boxplot(x=self._x_name, y="data", hue=self._ahead_dim, data=data_second, ax=ax[1], whis=1., + palette="Blues_d", showmeans=True, meanprops={"markersize": 1, "markeredgecolor": "k"}, + flierprops={"marker": "."}) ax[1].set(ylabel="", xlabel="") ax[1].yaxis.tick_right() handles, _ = ax[1].get_legend_handles_labels() @@ -746,9 +795,11 @@ class PlotBootstrapSkillScore(AbstractPlotClass): align_yaxis(ax[0], ax[1]) align_yaxis(ax[0], ax[1]) + plt.title(self._title) @staticmethod def _select_data(df: pd.DataFrame, variables: List[str], column_name: str) -> pd.DataFrame: + selected_data = None for i, variable in enumerate(variables): if i == 0: selected_data = df.loc[df[column_name] == variable] @@ -757,28 +808,29 @@ class PlotBootstrapSkillScore(AbstractPlotClass): selected_data = pd.concat([selected_data, tmp_var], axis=0) return selected_data - def raise_error_if_separate_vars_do_not_exist(self, data, separate_vars): - if not self._variables_exist_in_df(df=data, variables=separate_vars): + def raise_error_if_separate_vars_do_not_exist(self, data, separate_vars, column_name): + if not self._variables_exist_in_df(df=data, variables=separate_vars, column_name=column_name): raise ValueError(f"At least one entry of `separate_vars' does not exist in `self.data' ") @staticmethod def _get_unique_values_from_column_of_df(df: pd.DataFrame, column_name: str) -> List: return list(df[column_name].unique()) - def _variables_exist_in_df(self, df: pd.DataFrame, variables: List[str], column_name: str = 'boot_var'): + def _variables_exist_in_df(self, df: pd.DataFrame, variables: List[str], column_name: str): vars_in_df = set(self._get_unique_values_from_column_of_df(df, column_name)) return set(variables).issubset(vars_in_df) - def _plot_all_variables(self): + def _plot_all_variables(self, branch=None): """ """ fig, ax = plt.subplots() - sns.boxplot(x=self._x_name, y="data", hue="ahead", data=self._data, ax=ax, whis=1., palette="Blues_d", + plot_data = self._data if branch is None else self._data[self._data["branch"] == str(branch)] + sns.boxplot(x=self._x_name, y="data", hue=self._ahead_dim, data=plot_data, ax=ax, whis=1., palette="Blues_d", showmeans=True, meanprops={"markersize": 1, "markeredgecolor": "k"}, flierprops={"marker": "."}) ax.axhline(y=0, color="grey", linewidth=.5) plt.xticks(rotation=45) - ax.set(ylabel=f"skill score", xlabel="", title="summary of all stations") + ax.set(ylabel=f"skill score", xlabel="", title=self._title) handles, _ = ax.get_legend_handles_labels() ax.legend(handles, self._labels) plt.tight_layout() @@ -893,8 +945,6 @@ class PlotTimeSeries: def _plot_obs(self, ax, data): ahead = 1 obs_data = data.sel(type="obs", ahead=ahead).shift(index=ahead) - # index = data.index + np.timedelta64(1, self._sampling) - # ax.plot(index, obs_data.values, color=matplotlib.colors.cnames["green"], label="obs") ax.plot(obs_data, color=matplotlib.colors.cnames["green"], label="obs") @staticmethod diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index bd06914f3f7c2e8f745afbd4998eed68964b6fa1..8036413c8aefc3f70f8c24e59812c1a3b3324de1 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -19,7 +19,8 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_VAL_MIN_LENGTH, DEFAULT_TEST_START, DEFAULT_TEST_END, DEFAULT_TEST_MIN_LENGTH, DEFAULT_TRAIN_VAL_MIN_LENGTH, \ DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \ DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN, DEFAULT_ITER_DIM, \ - DEFAULT_USE_MULTIPROCESSING, DEFAULT_USE_MULTIPROCESSING_ON_DEBUG, DEFAULT_MAX_NUMBER_MULTIPROCESSING + DEFAULT_USE_MULTIPROCESSING, DEFAULT_USE_MULTIPROCESSING_ON_DEBUG, DEFAULT_MAX_NUMBER_MULTIPROCESSING, \ + DEFAULT_BOOTSTRAP_TYPE, DEFAULT_BOOTSTRAP_METHOD from mlair.data_handler import DefaultDataHandler from mlair.run_modules.run_environment import RunEnvironment from mlair.model_modules.fully_connected_networks import FCN_64_32_16 as VanillaModel @@ -211,8 +212,8 @@ class ExperimentSetup(RunEnvironment): create_new_model=None, bootstrap_path=None, permute_data_on_training=None, transformation=None, train_min_length=None, val_min_length=None, test_min_length=None, extreme_values: list = None, extremes_on_right_tail_only: bool = None, evaluate_bootstraps=None, plot_list=None, - number_of_bootstraps=None, - create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None, + number_of_bootstraps=None, create_new_bootstraps=None, bootstrap_method=None, bootstrap_type=None, + data_path: str = None, batch_path: str = None, login_nodes=None, hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, data_origin: Dict = None, competitors: list = None, competitor_path: str = None, use_multiprocessing: bool = None, use_multiprocessing_on_debug: bool = None, @@ -347,6 +348,8 @@ class ExperimentSetup(RunEnvironment): self._set_param("create_new_bootstraps", create_new_bootstraps, scope="general.postprocessing") self._set_param("number_of_bootstraps", number_of_bootstraps, default=DEFAULT_NUMBER_OF_BOOTSTRAPS, scope="general.postprocessing") + self._set_param("bootstrap_method", bootstrap_method, default=DEFAULT_BOOTSTRAP_METHOD) + self._set_param("bootstrap_type", bootstrap_type, default=DEFAULT_BOOTSTRAP_TYPE) self._set_param("plot_list", plot_list, default=DEFAULT_PLOT_LIST, scope="general.postprocessing") self._set_param("neighbors", ["DEBW030"]) # TODO: just for testing diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 0d7bfeb4c411eeeb4550bf33e187053ca84cd551..2c6a35394749d623aa8c942f58af244be0e21003 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -103,7 +103,10 @@ class PostProcessing(RunEnvironment): if self.data_store.get("evaluate_bootstraps", "postprocessing"): with TimeTracking(name="calculate bootstraps"): create_new_bootstraps = self.data_store.get("create_new_bootstraps", "postprocessing") - self.bootstrap_postprocessing(create_new_bootstraps) + bootstrap_method = self.data_store.get("bootstrap_method", "postprocessing") + bootstrap_type = self.data_store.get("bootstrap_type", "postprocessing") + self.bootstrap_postprocessing(create_new_bootstraps, bootstrap_type=bootstrap_type, + bootstrap_method=bootstrap_method) # skill scores and error metrics with TimeTracking(name="calculate skill scores"): @@ -136,7 +139,8 @@ class PostProcessing(RunEnvironment): continue return xr.concat(competing_predictions, "type") if len(competing_predictions) > 0 else None - def bootstrap_postprocessing(self, create_new_bootstraps: bool, _iter: int = 0) -> None: + def bootstrap_postprocessing(self, create_new_bootstraps: bool, _iter: int = 0, bootstrap_type="singleinput", + bootstrap_method="shuffle") -> None: """ Calculate skill scores of bootstrapped data. @@ -149,18 +153,26 @@ class PostProcessing(RunEnvironment): :param _iter: internal counter to reduce unnecessary recursive calls (maximum number is 2, otherwise something went wrong). """ - try: - if create_new_bootstraps: - self.create_bootstrap_forecast() - self.bootstrap_skill_scores = self.calculate_bootstrap_skill_scores() - except FileNotFoundError: - if _iter != 0: - raise RuntimeError("bootstrap_postprocessing is called for the 2nd time. This means, that calling" - "manually the reason for the failure.") - logging.info("Couldn't load all files, restart bootstrap postprocessing with create_new_bootstraps=True.") - self.bootstrap_postprocessing(True, _iter=1) - - def create_bootstrap_forecast(self) -> None: + self.bootstrap_skill_scores = {} + for boot_type in to_list(bootstrap_type): + self.bootstrap_skill_scores[boot_type] = {} + for boot_method in to_list(bootstrap_method): + try: + if create_new_bootstraps: + self.create_bootstrap_forecast(bootstrap_type=boot_type, bootstrap_method=boot_method) + boot_skill_score = self.calculate_bootstrap_skill_scores(bootstrap_type=boot_type, + bootstrap_method=boot_method) + self.bootstrap_skill_scores[boot_type][boot_method] = boot_skill_score + except FileNotFoundError: + if _iter != 0: + raise RuntimeError(f"bootstrap_postprocessing ({boot_type}, {boot_type}) was called for the 2nd" + f" time. This means, that something internally goes wrong. Please check for " + f"possible errors") + logging.info(f"Could not load all files for bootstrapping ({boot_type}, {boot_type}), restart " + f"bootstrap postprocessing with create_new_bootstraps=True.") + self.bootstrap_postprocessing(True, _iter=1, bootstrap_type=boot_type, bootstrap_method=boot_method) + + def create_bootstrap_forecast(self, bootstrap_type, bootstrap_method) -> None: """ Create bootstrapped predictions for all stations and variables. @@ -168,16 +180,15 @@ class PostProcessing(RunEnvironment): `bootstraps_labels_{station}.nc`. """ # forecast - with TimeTracking(name=inspect.stack()[0].function): + with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})"): # extract all requirements from data store - bootstrap_path = self.data_store.get("bootstrap_path") forecast_path = self.data_store.get("forecast_path") number_of_bootstraps = self.data_store.get("number_of_bootstraps", "postprocessing") dims = ["index", self.ahead_dim, "type"] for station in self.test_data: - logging.info(str(station)) X, Y = None, None - bootstraps = BootStraps(station, number_of_bootstraps) + bootstraps = BootStraps(station, number_of_bootstraps, bootstrap_type=bootstrap_type, + bootstrap_method=bootstrap_method) for boot in bootstraps: X, Y, (index, dimension) = boot # make bootstrap predictions @@ -188,18 +199,19 @@ class PostProcessing(RunEnvironment): bootstrap_predictions = np.expand_dims(bootstrap_predictions, axis=-1) shape = bootstrap_predictions.shape coords = (range(shape[0]), range(1, shape[1] + 1)) - var = f"{index}_{dimension}" + var = f"{index}_{dimension}" if index is not None else str(dimension) tmp = xr.DataArray(bootstrap_predictions, coords=(*coords, [var]), dims=dims) - file_name = os.path.join(forecast_path, f"bootstraps_{station}_{var}.nc") + file_name = os.path.join(forecast_path, + f"bootstraps_{station}_{var}_{bootstrap_type}_{bootstrap_method}.nc") tmp.to_netcdf(file_name) else: # store also true labels for each station labels = np.expand_dims(Y, axis=-1) - file_name = os.path.join(forecast_path, f"bootstraps_{station}_labels.nc") + file_name = os.path.join(forecast_path, f"bootstraps_{station}_{bootstrap_method}_labels.nc") labels = xr.DataArray(labels, coords=(*coords, ["obs"]), dims=dims) labels.to_netcdf(file_name) - def calculate_bootstrap_skill_scores(self) -> Dict[str, xr.DataArray]: + def calculate_bootstrap_skill_scores(self, bootstrap_type, bootstrap_method) -> Dict[str, xr.DataArray]: """ Calculate skill score of bootstrapped variables. @@ -209,53 +221,64 @@ class PostProcessing(RunEnvironment): :return: The result dictionary with station-wise skill scores """ - with TimeTracking(name=inspect.stack()[0].function): + with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})"): # extract all requirements from data store - bootstrap_path = self.data_store.get("bootstrap_path") forecast_path = self.data_store.get("forecast_path") number_of_bootstraps = self.data_store.get("number_of_bootstraps", "postprocessing") forecast_file = f"forecasts_norm_%s_test.nc" - bootstraps = BootStraps(self.test_data[0], number_of_bootstraps).bootstraps() - skill_scores = statistics.SkillScores(None) + + bootstraps = BootStraps(self.test_data[0], number_of_bootstraps, bootstrap_type=bootstrap_type, + bootstrap_method=bootstrap_method) + number_of_bootstraps = bootstraps.number_of_bootstraps + bootstrap_iter = bootstraps.bootstraps() + skill_scores = statistics.SkillScores(None, ahead_dim=self.ahead_dim) score = {} for station in self.test_data: - logging.info(station) - # get station labels - file_name = os.path.join(forecast_path, f"bootstraps_{str(station)}_labels.nc") - labels = xr.open_dataarray(file_name) + file_name = os.path.join(forecast_path, f"bootstraps_{str(station)}_{bootstrap_method}_labels.nc") + with xr.open_dataarray(file_name) as da: + labels = da.load() shape = labels.shape # get original forecasts orig = self.get_orig_prediction(forecast_path, forecast_file % str(station), number_of_bootstraps) orig = orig.reshape(shape) coords = (range(shape[0]), range(1, shape[1] + 1), ["orig"]) - orig = xr.DataArray(orig, coords=coords, dims=["index", "ahead", "type"]) + orig = xr.DataArray(orig, coords=coords, dims=["index", self.ahead_dim, "type"]) # calculate skill scores for each variable skill = pd.DataFrame(columns=range(1, self.window_lead_time + 1)) - for boot_set in bootstraps: - boot_var = f"{boot_set[0]}_{boot_set[1]}" - file_name = os.path.join(forecast_path, f"bootstraps_{station}_{boot_var}.nc") - boot_data = xr.open_dataarray(file_name) + for boot_set in bootstrap_iter: + boot_var = f"{boot_set[0]}_{boot_set[1]}" if isinstance(boot_set, tuple) else str(boot_set) + file_name = os.path.join(forecast_path, + f"bootstraps_{station}_{boot_var}_{bootstrap_type}_{bootstrap_method}.nc") + with xr.open_dataarray(file_name) as da: + boot_data = da.load() boot_data = boot_data.combine_first(labels).combine_first(orig) boot_scores = [] for ahead in range(1, self.window_lead_time + 1): - data = boot_data.sel(ahead=ahead) + data = boot_data.sel({self.ahead_dim: ahead}) boot_scores.append( skill_scores.general_skill_score(data, forecast_name=boot_var, reference_name="orig")) skill.loc[boot_var] = np.array(boot_scores) # collect all results in single dictionary - score[str(station)] = xr.DataArray(skill, dims=["boot_var", "ahead"]) + score[str(station)] = xr.DataArray(skill, dims=["boot_var", self.ahead_dim]) return score def get_orig_prediction(self, path, file_name, number_of_bootstraps, prediction_name=None): if prediction_name is None: prediction_name = self.forecast_indicator file = os.path.join(path, file_name) - prediction = xr.open_dataarray(file).sel(type=prediction_name).squeeze() - vals = np.tile(prediction.data, (number_of_bootstraps, 1)) + with xr.open_dataarray(file) as da: + prediction = da.load().sel(type=prediction_name).squeeze() + return self.repeat_data(prediction, number_of_bootstraps) + + @staticmethod + def repeat_data(data, number_of_repetition): + if isinstance(data, xr.DataArray): + data = data.data + vals = np.tile(data, (number_of_repetition, 1)) return vals[~np.isnan(vals).any(axis=1), :] def _get_model_name(self): @@ -317,8 +340,16 @@ class PostProcessing(RunEnvironment): try: if (self.bootstrap_skill_scores is not None) and ("PlotBootstrapSkillScore" in plot_list): - PlotBootstrapSkillScore(self.bootstrap_skill_scores, plot_folder=self.plot_path, - model_setup=self.forecast_indicator) + for boot_type, boot_data in self.bootstrap_skill_scores.items(): + for boot_method, boot_skill_score in boot_data.items(): + try: + PlotBootstrapSkillScore(boot_skill_score, plot_folder=self.plot_path, + model_setup=self.forecast_indicator, sampling=self._sampling, + ahead_dim=self.ahead_dim, separate_vars=to_list(self.target_var), + bootstrap_type=boot_type, bootstrap_method=boot_method) + except Exception as e: + logging.error(f"Could not create plot PlotBootstrapSkillScore ({boot_type}, {boot_method}) " + f"due to the following error: {e}") except Exception as e: logging.error(f"Could not create plot PlotBootstrapSkillScore due to the following error: {e}") @@ -495,8 +526,8 @@ class PostProcessing(RunEnvironment): """ path = os.path.join(self.competitor_path, competitor_name) file = os.path.join(path, f"forecasts_{station_name}_test.nc") - data = xr.open_dataarray(file) - # data = data.expand_dims(Stations=[station_name]) # ToDo: remove line + with xr.open_dataarray(file) as da: + data = da.load() forecast = data.sel(type=[self.forecast_indicator]) forecast.coords["type"] = [competitor_name] return forecast @@ -652,7 +683,8 @@ class PostProcessing(RunEnvironment): """ try: file = os.path.join(path, f"forecasts_{str(station)}_train_val.nc") - return xr.open_dataarray(file) + with xr.open_dataarray(file) as da: + return da.load() except (IndexError, KeyError, FileNotFoundError): return None @@ -667,7 +699,8 @@ class PostProcessing(RunEnvironment): """ try: file = os.path.join(path, f"forecasts_{str(station)}_test.nc") - return xr.open_dataarray(file) + with xr.open_dataarray(file) as da: + return da.load() except (IndexError, KeyError, FileNotFoundError): return None @@ -709,14 +742,14 @@ class PostProcessing(RunEnvironment): competitor = self.load_competitors(station) combined = self._combine_forecasts(external_data, competitor, dim="type") model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None - skill_score = statistics.SkillScores(combined, models=model_list) + skill_score = statistics.SkillScores(combined, models=model_list, ahead_dim=self.ahead_dim) if external_data is not None: - skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) + skill_score_competitive[station] = skill_score.skill_scores() internal_data = self._get_internal_data(station, path) if internal_data is not None: skill_score_climatological[station] = skill_score.climatological_skill_scores( - internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) + internal_data, forecast_name=self.forecast_indicator) errors.update({"total": self.calculate_average_errors(errors)}) return skill_score_competitive, skill_score_climatological, errors diff --git a/test/test_data_handler/old_t_bootstraps.py b/test/test_data_handler/old_t_bootstraps.py index 9616ed3f457d74e44e8a9eae5a3ed862fa804011..21c18c6c2d6f6a6a38a41250f00d3d14a29ed457 100644 --- a/test/test_data_handler/old_t_bootstraps.py +++ b/test/test_data_handler/old_t_bootstraps.py @@ -160,7 +160,7 @@ class TestCreateShuffledData: def test_shuffle(self, shuffled_data_no_creation): dummy = np.array([[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3]]) - res = shuffled_data_no_creation.shuffle(dummy, chunks=(2, 3)).compute() + res = shuffled_data_no_creation.apply_bootstrap_method(dummy, chunks=(2, 3)).compute() assert res.shape == dummy.shape assert dummy.max() >= res.max() assert dummy.min() <= res.min()