diff --git a/mlair/data_handler/abstract_data_handler.py b/mlair/data_handler/abstract_data_handler.py index 9ea163fcad2890580e9c44e4bda0627d6419dc9f..a82e5005e8b30f9e3978ae61859e6b80746d95f1 100644 --- a/mlair/data_handler/abstract_data_handler.py +++ b/mlair/data_handler/abstract_data_handler.py @@ -22,6 +22,9 @@ class AbstractDataHandler(object): """Return initialised class.""" return cls(*args, **kwargs) + def __len__(self, upsampling=False): + raise NotImplementedError + @classmethod def requirements(cls, skip_args=None): """Return requirements and own arguments without duplicates.""" diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index ae46ad918630f2a5f083c62b558609e85d7cc2d8..69c9537b10ca583adf84480636680a99ab265a67 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -55,6 +55,8 @@ class DefaultDataHandler(AbstractDataHandler): self._X_extreme = None self._Y_extreme = None self._data_intersection = None + self._len = None + self._len_upsampling = None self._use_multiprocessing = use_multiprocessing self._max_number_multiprocessing = max_number_multiprocessing _name_affix = str(f"{str(self.id_class)}_{name_affix}" if name_affix is not None else id(self)) @@ -134,6 +136,12 @@ class DefaultDataHandler(AbstractDataHandler): def __repr__(self): return str(self._collection[0]) + def __len__(self, upsampling=False): + if upsampling is False: + return self._len + else: + return self._len_upsampling + def get_X_original(self): X = [] for data in self._collection: @@ -174,6 +182,7 @@ class DefaultDataHandler(AbstractDataHandler): Y = Y_original.sel({dim: intersect}) self._data_intersection = intersect self._X, self._Y = X, Y + self._len = len(self._data_intersection) def get_observation(self): dim = self.time_dim @@ -208,6 +217,7 @@ class DefaultDataHandler(AbstractDataHandler): if extreme_values is None: logging.debug(f"No extreme values given, skip multiply extremes") self._X_extreme, self._Y_extreme = self._X, self._Y + self._len_upsampling = self._len return # check type if inputs @@ -243,6 +253,7 @@ class DefaultDataHandler(AbstractDataHandler): self._Y_extreme = xr.concat([Y, extremes_Y], dim=dim) self._X_extreme = list(map(lambda x1, x2: xr.concat([x1, x2], dim=dim), X, extremes_X)) + self._len_upsampling = len(self._X_extreme[0].coords[dim]) @staticmethod def _add_timedelta(data, dim, timedelta): diff --git a/mlair/data_handler/iterator.py b/mlair/data_handler/iterator.py index 3fc25a90f861c65d38aa6b7019095210035d4c2d..7aae1837e490d116b8dbeae31c5aeb6b459f9287 100644 --- a/mlair/data_handler/iterator.py +++ b/mlair/data_handler/iterator.py @@ -8,7 +8,8 @@ import numpy as np import math import os import shutil -import pickle +import psutil +import multiprocessing import logging import dill from typing import Tuple, List @@ -75,7 +76,7 @@ class DataCollection(Iterable): class KerasIterator(keras.utils.Sequence): def __init__(self, collection: DataCollection, batch_size: int, batch_path: str, shuffle_batches: bool = False, - model=None, upsampling=False, name=None): + model=None, upsampling=False, name=None, use_multiprocessing=False, max_number_multiprocessing=1): self._collection = collection batch_path = os.path.join(batch_path, str(name if name is not None else id(self))) self._path = os.path.join(batch_path, "%i.pickle") @@ -85,7 +86,7 @@ class KerasIterator(keras.utils.Sequence): self.upsampling = upsampling self.indexes: list = [] self._cleanup_path(batch_path) - self._prepare_batches() + self._prepare_batches(use_multiprocessing, max_number_multiprocessing) def __len__(self) -> int: return len(self.indexes) @@ -119,62 +120,61 @@ class KerasIterator(keras.utils.Sequence): """Concatenate two lists of data along axis=0.""" return list(map(lambda n1, n2: np.concatenate((n1, n2), axis=0), old, new)) - def _get_batch(self, data_list: List[np.ndarray], b: int) -> List[np.ndarray]: - """Get batch according to batch size from data list.""" - return list(map(lambda data: data[b * self.batch_size:(b + 1) * self.batch_size, ...], data_list)) - @staticmethod - def _permute_data(X, Y): - p = np.random.permutation(len(X[0])) # equiv to .shape[0] - X = list(map(lambda x: x[p], X)) - Y = list(map(lambda x: x[p], Y)) - return X, Y + def _concatenate_multi(*args: List[np.ndarray]) -> List[np.ndarray]: + """Concatenate two lists of data along axis=0.""" + return list(map(lambda *_args: np.concatenate(_args, axis=0), *args)) - def _prepare_batches(self) -> None: + def _prepare_batches(self, use_multiprocessing=False, max_process=1) -> None: """ Prepare all batches as locally stored files. Walk through all elements of collection and split (or merge) data according to the batch size. Too long data - sets are divided into multiple batches. Not fully filled batches are merged with data from the next collection - element. If data is remaining after the last element, it is saved as smaller batch. All batches are enumerated - beginning from 0. A list with all batch numbers is stored in class's parameter indexes. + sets are divided into multiple batches. Not fully filled batches are retained together with remains from the + next collection elements. These retained data are concatenated and also split into batches. If data are still + remaining afterwards, they are saved as final smaller batch. All batches are enumerated by a running index + starting at 0. A list with all batch numbers is stored in class's parameter indexes. This method can either + use a serial approach or use multiprocessing to decrease computational time. """ index = 0 - remaining = None + remaining = [] mod_rank = self._get_model_rank() + n_process = min([psutil.cpu_count(logical=False), len(self._collection), max_process]) # use only physical cpus + if n_process > 1 and use_multiprocessing is True: # parallel solution + pool = multiprocessing.Pool(n_process) + output = [] + else: + pool = None + output = None for data in self._collection: - logging.debug(f"prepare batches for {str(data)}") - X, _Y = data.get_data(upsampling=self.upsampling) - Y = [_Y[0] for _ in range(mod_rank)] - if self.upsampling: - X, Y = self._permute_data(X, Y) - if remaining is not None: - X, Y = self._concatenate(X, remaining[0]), self._concatenate(Y, remaining[1]) + length = data.__len__(self.upsampling) + batches = _get_number_of_mini_batches(length, self.batch_size) + if pool is None: + res = f_proc(data, self.upsampling, mod_rank, self.batch_size, self._path, index) + if res is not None: + remaining.append(res) + else: + output.append(pool.apply_async(f_proc, args=(data, self.upsampling, mod_rank, self.batch_size, self._path, index))) + index += batches + if output is not None: + for p in output: + res = p.get() + if res is not None: + remaining.append(res) + pool.close() + if len(remaining) > 0: + X = self._concatenate_multi(*[e[0] for e in remaining]) + Y = self._concatenate_multi(*[e[1] for e in remaining]) length = X[0].shape[0] - batches = self._get_number_of_mini_batches(length) - for b in range(batches): - batch_X, batch_Y = self._get_batch(X, b), self._get_batch(Y, b) - self._save_to_pickle(X=batch_X, Y=batch_Y, index=index) + batches = _get_number_of_mini_batches(length, self.batch_size) + remaining = f_proc((X, Y), self.upsampling, mod_rank, self.batch_size, self._path, index) + index += batches + if remaining is not None: + _save_to_pickle(self._path, X=remaining[0], Y=remaining[1], index=index) index += 1 - if (batches * self.batch_size) < length: # keep remaining to concatenate with next data element - remaining = (self._get_batch(X, batches), self._get_batch(Y, batches)) - else: - remaining = None - if remaining is not None: # add remaining as smaller batch - self._save_to_pickle(X=remaining[0], Y=remaining[1], index=index) - index += 1 self.indexes = np.arange(0, index).tolist() - - def _save_to_pickle(self, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None: - """Save data as pickle file with variables X and Y and given index as <index>.pickle .""" - data = {"X": X, "Y": Y} - file = self._path % index - with open(file, "wb") as f: - dill.dump(data, f) - - def _get_number_of_mini_batches(self, number_of_samples: int) -> int: - """Return number of mini batches as the floored ration of number of samples to batch size.""" - return math.floor(number_of_samples / self.batch_size) + if pool is not None: + pool.join() @staticmethod def _cleanup_path(path: str, create_new: bool = True) -> None: @@ -188,3 +188,49 @@ class KerasIterator(keras.utils.Sequence): """Randomly shuffle indexes if enabled.""" if self.shuffle is True: np.random.shuffle(self.indexes) + + +def _save_to_pickle(path, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None: + """Save data as pickle file with variables X and Y and given index as <index>.pickle .""" + data = {"X": X, "Y": Y} + file = path % index + with open(file, "wb") as f: + dill.dump(data, f) + + +def _get_batch(data_list: List[np.ndarray], b: int, batch_size: int) -> List[np.ndarray]: + """Get batch according to batch size from data list.""" + return list(map(lambda data: data[b * batch_size:(b + 1) * batch_size, ...], data_list)) + + +def _permute_data(X, Y): + p = np.random.permutation(len(X[0])) # equiv to .shape[0] + X = list(map(lambda x: x[p], X)) + Y = list(map(lambda x: x[p], Y)) + return X, Y + + +def _get_number_of_mini_batches(number_of_samples: int, batch_size: int) -> int: + """Return number of mini batches as the floored ration of number of samples to batch size.""" + return math.floor(number_of_samples / batch_size) + + +def f_proc(data, upsampling, mod_rank, batch_size, _path, index): + if isinstance(data, tuple) is True: + X, _Y = data + else: + X, _Y = data.get_data(upsampling=upsampling) + Y = [_Y[0] for _ in range(mod_rank)] + if upsampling: + X, Y = _permute_data(X, Y) + length = X[0].shape[0] + batches = _get_number_of_mini_batches(length, batch_size) + for b in range(batches): + batch_X, batch_Y = _get_batch(X, b, batch_size), _get_batch(Y, b, batch_size) + _save_to_pickle(_path, X=batch_X, Y=batch_Y, index=index) + index += 1 + if (batches * batch_size) < length: # keep remaining to concatenate with next data element + remaining = (_get_batch(X, batches, batch_size), _get_batch(Y, batches, batch_size)) + else: + remaining = None + return remaining diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 6b3895f3538379ab0a5faed87de072a711c17d5f..1500cdab23fca3058bffc838b6855fd0b3455f3d 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -349,7 +349,8 @@ class PostProcessing(RunEnvironment): return d[..., pos] # forecast - with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})"): + with TimeTracking(name=f"{inspect.stack()[0].function} ({bootstrap_type}, {bootstrap_method})", + log_on_enter=True): # extract all requirements from data store number_of_bootstraps = self.data_store.get("n_boots", "feature_importance") dims = [self.uncertainty_estimate_boot_dim, self.index_dim, self.ahead_dim, self.model_type_dim] diff --git a/mlair/run_modules/training.py b/mlair/run_modules/training.py index 5ce906122ef184d6dcad5527e923e44f04028fe5..0d7bb98f109b612cf3cffc3dc31541bb1733c541 100644 --- a/mlair/run_modules/training.py +++ b/mlair/run_modules/training.py @@ -19,7 +19,7 @@ from mlair.model_modules.keras_extensions import CallbackHandler from mlair.plotting.training_monitoring import PlotModelHistory, PlotModelLearningRate from mlair.run_modules.run_environment import RunEnvironment from mlair.configuration import path_config -from mlair.helpers import to_list, tables +from mlair.helpers import to_list, tables, TimeTrackingWrapper class Training(RunEnvironment): @@ -102,6 +102,7 @@ class Training(RunEnvironment): """ self.model.make_predict_function() + @TimeTrackingWrapper def _set_gen(self, mode: str) -> None: """ Set and distribute the generators for given mode regarding batch size. @@ -109,9 +110,11 @@ class Training(RunEnvironment): :param mode: name of set, should be from ["train", "val", "test"] """ collection = self.data_store.get("data_collection", mode) - kwargs = self.data_store.create_args_dict(["upsampling", "shuffle_batches", "batch_path"], scope=mode) + kwargs = self.data_store.create_args_dict(["upsampling", "shuffle_batches", "batch_path", "use_multiprocessing", + "max_number_multiprocessing"], scope=mode) setattr(self, f"{mode}_set", KerasIterator(collection, self.batch_size, model=self.model, name=mode, **kwargs)) + @TimeTrackingWrapper def set_generators(self) -> None: """ Set all generators for training, validation, and testing subsets. diff --git a/test/test_data_handler/test_iterator.py b/test/test_data_handler/test_iterator.py index bb8ecb5d216519b3662a5baa4d463780b4c29d8c..b5fb30a90c99d33e9dfe3db1346cfd7f43549fc9 100644 --- a/test/test_data_handler/test_iterator.py +++ b/test/test_data_handler/test_iterator.py @@ -1,4 +1,5 @@ from mlair.data_handler.iterator import DataCollection, StandardIterator, KerasIterator +from mlair.data_handler.iterator import _get_number_of_mini_batches, _get_batch, _permute_data, _save_to_pickle, f_proc from mlair.helpers.testing import PyTestAllEqual from mlair.model_modules.model_class import MyBranchedModel from mlair.model_modules.fully_connected_networks import FCN_64_32_16 @@ -89,6 +90,14 @@ class DummyData: def __init__(self, number_of_samples=np.random.randint(100, 150)): np.random.seed(45) self.number_of_samples = number_of_samples + self._len = self.number_of_samples + self._len_upsampling = self.number_of_samples + + def __len__(self, upsampling=False): + if upsampling is False: + return self._len + else: + return self._len_upsampling def get_X(self, upsampling=False, as_numpy=True): np.random.seed(45) @@ -152,13 +161,6 @@ class TestKerasIterator: iterator._cleanup_path(path, create_new=False) assert os.path.exists(path) is False - def test_get_number_of_mini_batches(self): - iterator = object.__new__(KerasIterator) - iterator.batch_size = 36 - assert iterator._get_number_of_mini_batches(30) == 0 - assert iterator._get_number_of_mini_batches(40) == 1 - assert iterator._get_number_of_mini_batches(72) == 2 - def test_len(self): iterator = object.__new__(KerasIterator) iterator.indexes = [0, 1, 2, 3, 4, 5] @@ -175,25 +177,6 @@ class TestKerasIterator: for i in range(3): assert PyTestAllEqual([new_arr[i], test_arr[i]]) - def test_get_batch(self): - arr = DummyData(20).get_X() - iterator = object.__new__(KerasIterator) - iterator.batch_size = 19 - batch1 = iterator._get_batch(arr, 0) - assert batch1[0].shape[0] == 19 - batch2 = iterator._get_batch(arr, 1) - assert batch2[0].shape[0] == 1 - - def test_save_to_pickle(self, path): - os.makedirs(path) - d = DummyData(20) - X, Y = d.get_X(), d.get_Y() - iterator = object.__new__(KerasIterator) - iterator._path = os.path.join(path, "%i.pickle") - assert os.path.exists(iterator._path % 2) is False - iterator._save_to_pickle(X=X, Y=Y, index=2) - assert os.path.exists(iterator._path % 2) is True - def test_prepare_batches(self, collection, path): iterator = object.__new__(KerasIterator) iterator._collection = collection @@ -292,14 +275,111 @@ class TestKerasIterator: with pytest.raises(TypeError): iterator._get_model_rank() - def test_permute(self): - iterator = object.__new__(KerasIterator) + +class TestGetNumberOfMiniBatches: + + def test_get_number_of_mini_batches(self): + batch_size = 36 + assert _get_number_of_mini_batches(30, batch_size) == 0 + assert _get_number_of_mini_batches(40, batch_size) == 1 + assert _get_number_of_mini_batches(72, batch_size) == 2 + + +class TestGetBatch: + + def test_get_batch(self): + arr = DummyData(20).get_X() + batch_size = 19 + batch1 = _get_batch(arr, 0, batch_size) + assert batch1[0].shape[0] == 19 + batch2 = _get_batch(arr, 1, batch_size) + assert batch2[0].shape[0] == 1 + + +class TestSaveToPickle: + + @pytest.fixture + def path(self): + p = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata") + shutil.rmtree(p, ignore_errors=True) if os.path.exists(p) else None + yield p + shutil.rmtree(p, ignore_errors=True) + + def test_save_to_pickle(self, path): + os.makedirs(path) + d = DummyData(20) + X, Y = d.get_X(), d.get_Y() + _path = os.path.join(path, "%i.pickle") + assert os.path.exists(_path % 2) is False + _save_to_pickle(_path, X=X, Y=Y, index=2) + assert os.path.exists(_path % 2) is True + + +class TestPermuteData: + + def test_permute_data(self): X = [np.array([[1, 2, 3, 4], [1.1, 2.1, 3.1, 4.1], [1.2, 2.2, 3.2, 4.2]], dtype="f2")] Y = [np.array([1, 2, 3])] - X_p, Y_p = iterator._permute_data(X, Y) + X_p, Y_p = _permute_data(X, Y) assert X_p[0].shape == X[0].shape assert Y_p[0].shape == Y[0].shape assert np.testing.assert_almost_equal(X_p[0].sum(), X[0].sum(), 2) is None assert np.testing.assert_almost_equal(Y_p[0].sum(), Y[0].sum(), 2) is None + + +class TestFProc: + + @pytest.fixture + def collection(self): + coll = [] + for i in range(3): + coll.append(DummyData(50 + i)) + data_coll = DataCollection(collection=coll) + return data_coll + + @pytest.fixture + def collection_small(self): + coll = [] + for i in range(3): + coll.append(DummyData(5 + i)) + data_coll = DataCollection(collection=coll) + return data_coll + + @pytest.fixture + def path(self): + p = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata") + shutil.rmtree(p, ignore_errors=True) if os.path.exists(p) else None + os.makedirs(p) + yield p + shutil.rmtree(p, ignore_errors=True) + + def test_f_proc(self, collection, path): + data = collection[0] + upsampling = False + mod_rank = 2 + batch_size = 32 + remaining = f_proc(data, upsampling, mod_rank, batch_size, os.path.join(path, "%i.pickle"), 0) + assert isinstance(remaining, tuple) + assert len(remaining) == 2 + assert isinstance(remaining[0], list) + assert len(remaining[0]) == 3 + assert remaining[0][0].shape == (18, 14, 5) + + def test_f_proc_no_remaining(self, collection, path): + data = collection[0] + upsampling = False + mod_rank = 2 + batch_size = 50 + remaining = f_proc(data, upsampling, mod_rank, batch_size, os.path.join(path, "%i.pickle"), 0) + assert remaining is None + + def test_f_proc_X_Y(self, collection, path): + data = collection[0] + X, Y = data.get_data() + upsamling = False + mod_rank = 2 + batch_size = 40 + remaining = f_proc((X, Y), upsamling, mod_rank, batch_size, os.path.join(path, "%i.pickle"), 0) + assert remaining[0][0].shape == (10, 14, 5) diff --git a/test/test_run_modules/test_model_setup.py b/test/test_run_modules/test_model_setup.py index 6e8d3ea9ebab40c79b17b2fba386322a630f00e1..295954a7cf53927939d50de5a84d474cb1818026 100644 --- a/test/test_run_modules/test_model_setup.py +++ b/test/test_run_modules/test_model_setup.py @@ -139,6 +139,14 @@ class DummyData: def __init__(self, number_of_samples=np.random.randint(100, 150)): self.number_of_samples = number_of_samples + self._len = self.number_of_samples + self._len_upsampling = self.number_of_samples + + def __len__(self, upsampling=False): + if upsampling is False: + return self._len + else: + return self._len_upsampling def get_X(self, upsampling=False, as_numpy=True): X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 1, 5)) # samples, window, variables