diff --git a/src/data_handling/iterator.py b/src/data_handling/iterator.py index d2ef9eb8df6373934e30ef9ca98c5de3fefed6c9..883ba4df7d59a40f2d00247b62daa8936aa79f63 100644 --- a/src/data_handling/iterator.py +++ b/src/data_handling/iterator.py @@ -55,8 +55,9 @@ class DataCollection(Iterable): class KerasIterator(keras.utils.Sequence): def __init__(self, collection: DataCollection, batch_size: int, batch_path: str, shuffle_batches: bool = False, - model=None, upsampling=False): + model=None, upsampling=False, name=None): self._collection = collection + batch_path = os.path.join(batch_path, str(name) if name is not None else id(self)) self._path = os.path.join(batch_path, "%i.pickle") self.batch_size = batch_size self.model = model diff --git a/src/model_modules/linear_model.py b/src/model_modules/linear_model.py index e556f0358a2a5e5247f7b6cc7d416af25a8a664d..85c062119d881635bf54aadb7491a3c0298e64f5 100644 --- a/src/model_modules/linear_model.py +++ b/src/model_modules/linear_model.py @@ -42,21 +42,27 @@ class OrdinaryLeastSquaredModel: return self.ordinary_least_squared_model(self.x, self.y) def _set_x_y_from_generator(self): - data_x = None - data_y = None + data_x, data_y = None, None for item in self.generator: - x = self.reshape_xarray_to_numpy(item[0]) - y = item[1].values - data_x = np.concatenate((data_x, x), axis=0) if data_x is not None else x - data_y = np.concatenate((data_y, y), axis=0) if data_y is not None else y - self.x = data_x - self.y = data_y + x, y = item.get_data(as_numpy=True) + x = self.flatten(x) + data_x = self._concatenate(x, data_x) + data_y = self._concatenate(y, data_y) + self.x, self.y = np.concatenate(data_x, axis=1), data_y[0] + + def _concatenate(self, new, old): + return list(map(lambda n1, n2: np.concatenate((n1, n2), axis=0), old, new)) if old is not None else new def predict(self, data): """Apply OLS model on data.""" data = sm.add_constant(self.reshape_xarray_to_numpy(data), has_constant="add") return np.atleast_2d(self.model.predict(data)) + @staticmethod + def flatten(data): + shapes = list(map(lambda x: x.shape, data)) + return list(map(lambda x, shape: x.reshape(shape[0], -1), data, shapes)) + @staticmethod def reshape_xarray_to_numpy(data): """Reshape xarray data to numpy data and flatten.""" diff --git a/src/model_modules/model_class.py b/src/model_modules/model_class.py index 6b3b9972bc0af4c968f2831963cc18446ff09162..cfe058f2a5682a70475818916c6e160f51efc7b3 100644 --- a/src/model_modules/model_class.py +++ b/src/model_modules/model_class.py @@ -387,14 +387,15 @@ class MyLittleModel(AbstractModelClass): x_in = self.activation()(x_in) x_in = keras.layers.Dense(self.window_lead_time, name='{}_Dense'.format("major"))(x_in) out_main = self.activation()(x_in) - self.model = keras.Model(inputs=[x_input], outputs=[out_main]) + self.model = keras.Model(inputs=x_input, outputs=[out_main]) def set_compile_options(self): self.initial_lr = 1e-2 - self.optimizer = keras.optimizers.SGD(lr=self.initial_lr, momentum=0.9) + # self.optimizer = keras.optimizers.SGD(lr=self.initial_lr, momentum=0.9) + self.optimizer = keras.optimizers.adam(lr=self.initial_lr) self.lr_decay = src.model_modules.keras_extensions.LearningRateDecay(base_lr=self.initial_lr, drop=.94, epochs_drop=10) - self.compile_options = {"loss": keras.losses.mean_squared_error, "metrics": ["mse", "mae"]} + self.compile_options = {"loss": [keras.losses.mean_squared_error], "metrics": ["mse", "mae"]} class MyBranchedModel(AbstractModelClass): diff --git a/src/run.py b/src/run.py index 4033d52303035ede583529169e93548ab7a205e1..31127cfe81f0abcea753c9987c78cbeb29a696b4 100644 --- a/src/run.py +++ b/src/run.py @@ -40,4 +40,8 @@ def run(stations=None, if __name__ == "__main__": from src.data_handling.advanced_data_handling import CustomDataClass - run(data_preparation=CustomDataClass, statistics_per_var={'o3': 'dma8eu'}, transformation={}) + run(data_preparation=CustomDataClass, statistics_per_var={'o3': 'dma8eu'}, transformation={"scope": "data", + "method": "standardise", + "mean": 50, + "std": 50}, + trainable=False, create_new_model=False) diff --git a/src/run_modules/post_processing.py b/src/run_modules/post_processing.py index b97d28c1cf71d35526207450d6b0bb386ddefdb7..2512244c8f9516becfb0edec48a3c9f82e5643de 100644 --- a/src/run_modules/post_processing.py +++ b/src/run_modules/post_processing.py @@ -13,7 +13,7 @@ import numpy as np import pandas as pd import xarray as xr -from src.data_handling import BootStraps, Distributor, DataGenerator, DataPrepJoin +from src.data_handling import BootStraps, Distributor, DataGenerator, DataPrepJoin, KerasIterator from src.helpers.datastore import NameNotFoundInDataStore from src.helpers import TimeTracking, statistics from src.model_modules.linear_model import OrdinaryLeastSquaredModel @@ -65,11 +65,12 @@ class PostProcessing(RunEnvironment): self.model: keras.Model = self._load_model() self.ols_model = None self.batch_size: int = self.data_store.get_default("batch_size", "model", 64) - self.test_data: DataGenerator = self.data_store.get("generator", "test") - self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size) - self.train_data: DataGenerator = self.data_store.get("generator", "train") - self.val_data: DataGenerator = self.data_store.get("generator", "val") - self.train_val_data: DataGenerator = self.data_store.get("generator", "train_val") + self.test_data = self.data_store.get("data_collection", "test") + batch_path = self.data_store.get("batch_path", scope="test") + self.test_data_distributed = KerasIterator(self.test_data, self.batch_size, model=self.model, name="test", batch_path=batch_path) + self.train_data = self.data_store.get("data_collection", "train") + self.val_data = self.data_store.get("data_collection", "val") + self.train_val_data = self.data_store.get("data_collection", "train_val") self.plot_path: str = self.data_store.get("plot_path") self.target_var = self.data_store.get("target_var") self._sampling = self.data_store.get("sampling") @@ -311,17 +312,17 @@ class PostProcessing(RunEnvironment): be found inside `forecast_path`. """ logging.debug("start make_prediction") - for i, _ in enumerate(self.test_data): - data = self.test_data.get_data_generator(i) - input_data = data.get_transposed_history() + for i, data in enumerate(self.test_data): + input_data = data.get_X() + target_data = data.get_Y() # get scaling parameters - mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) + # mean, std, transformation_method = data.get_transformation_information(variable=self.target_var) for normalised in [True, False]: # create empty arrays nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays( - data, count=4) + target_data, count=4) # nn forecast nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method, @@ -459,8 +460,8 @@ class PostProcessing(RunEnvironment): return nn_prediction @staticmethod - def _create_empty_prediction_arrays(generator, count=1): - return [generator.label.copy() for _ in range(count)] + def _create_empty_prediction_arrays(target_data, count=1): + return [target_data.copy() for _ in range(count)] @staticmethod def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame: diff --git a/src/run_modules/pre_processing.py b/src/run_modules/pre_processing.py index 72493c1fbad42a7aa9fec1e32292c0727a7dfb38..c5f10e8b9eac0660753c2af858d74d5def31ced9 100644 --- a/src/run_modules/pre_processing.py +++ b/src/run_modules/pre_processing.py @@ -197,43 +197,6 @@ class PreProcessing(RunEnvironment): self.data_store.set("stations", valid_stations, scope=set_name) self.data_store.set("data_collection", collection, scope=set_name) - def create_set_split(self, index_list: slice, set_name: str) -> None: - """ - Create subsets and store in data store. - - Create the subset for given split index and stores the data_collection with given set name in data store as - `data_collection`. Check for all valid stations using the default (kw)args for given scope and create the - data_collection for all valid stations. Also set all transformation information, if subset is training set. Make - sure, that the train set is executed first, and all other subsets afterwards. - - :param index_list: list of all stations to use for the set. If attribute use_all_stations_on_all_data_sets=True, - this list is ignored. - :param set_name: name to load/save all information from/to data store. - """ - args = self.data_store.create_args_dict(DEFAULT_ARGS_LIST, scope=set_name) - kwargs = self.data_store.create_args_dict(DEFAULT_KWARGS_LIST, scope=set_name) - stations = args["stations"] - if self.data_store.get("use_all_stations_on_all_data_sets"): - set_stations = stations - else: - set_stations = stations[index_list] - logging.debug(f"{set_name.capitalize()} stations (len={len(set_stations)}): {set_stations}") - # validate set - set_stations = self.check_valid_stations(args, kwargs, set_stations, load_tmp=False, name=set_name) - self.data_store.set("stations", set_stations, scope=set_name) - # create set data_collection and store - set_args = self.data_store.create_args_dict(DEFAULT_ARGS_LIST, scope=set_name) - data_prep_kwargs = self.data_store.create_args_dict(["interpolate_dim", "data_path", "min_length", "extreme_values", "extremes_on_right_tail_only"], scope=set_name) - collection = DataCollection() - for station in set_stations: - args["station"] = station - - def f(sp_args, sp_kwargs, dp_kwargs): - DataPreparation(StationPrep(**sp_args, **sp_kwargs), **dp_kwargs) - - collection.add(f(**set_args, **kwargs, **data_prep_kwargs)) - self.data_store.set("data_collection", collection, scope=set_name) - @staticmethod def check_valid_stations(args: Dict, kwargs: Dict, all_stations: List[str], load_tmp=True, save_tmp=True, name=None): diff --git a/src/run_modules/training.py b/src/run_modules/training.py index a92fd56fda5599489992b1bccaca3a715dd622d7..4ca0063cc4c6446e80db91626fe535e613cd7c82 100644 --- a/src/run_modules/training.py +++ b/src/run_modules/training.py @@ -104,7 +104,7 @@ class Training(RunEnvironment): """ collection = self.data_store.get("data_collection", mode) kwargs = self.data_store.create_args_dict(["upsampling", "shuffle_batches", "batch_path"], scope=mode) - setattr(self, f"{mode}_set", KerasIterator(collection, self.batch_size, model=self.model, **kwargs)) + setattr(self, f"{mode}_set", KerasIterator(collection, self.batch_size, model=self.model, name=mode, **kwargs)) def set_generators(self) -> None: """