diff --git a/mlair/data_handler/iterator.py b/mlair/data_handler/iterator.py index 39cceb78b6abdde7a21f0f896fd46c22d29a4a4c..b838626221b18c3f3b55ba15e43ae152d83ce5c6 100644 --- a/mlair/data_handler/iterator.py +++ b/mlair/data_handler/iterator.py @@ -13,7 +13,6 @@ import multiprocessing import logging import dill from typing import Tuple, List -from mlair.helpers import TimeTrackingWrapper class StandardIterator(Iterator): @@ -87,9 +86,7 @@ class KerasIterator(keras.utils.Sequence): self.upsampling = upsampling self.indexes: list = [] self._cleanup_path(batch_path) - self._prepare_batches_orig() self._prepare_batches_parallel(use_multiprocessing, max_number_multiprocessing) - # self._prepare_batches(False, max_number_multiprocessing) def __len__(self) -> int: return len(self.indexes) @@ -128,18 +125,6 @@ class KerasIterator(keras.utils.Sequence): """Concatenate two lists of data along axis=0.""" return list(map(lambda *_args: np.concatenate(_args, axis=0), *args)) - def _get_batch(self, data_list: List[np.ndarray], b: int) -> List[np.ndarray]: - """Get batch according to batch size from data list.""" - return list(map(lambda data: data[b * self.batch_size:(b + 1) * self.batch_size, ...], data_list)) - - @staticmethod - def _permute_data(X, Y): - p = np.random.permutation(len(X[0])) # equiv to .shape[0] - X = list(map(lambda x: x[p], X)) - Y = list(map(lambda x: x[p], Y)) - return X, Y - - @TimeTrackingWrapper def _prepare_batches_orig(self): """ Prepare all batches as locally stored files. @@ -157,26 +142,35 @@ class KerasIterator(keras.utils.Sequence): X, _Y = data.get_data(upsampling=self.upsampling) Y = [_Y[0] for _ in range(mod_rank)] if self.upsampling: - X, Y = self._permute_data(X, Y) + X, Y = _permute_data(X, Y) if remaining is not None: X, Y = self._concatenate(X, remaining[0]), self._concatenate(Y, remaining[1]) length = X[0].shape[0] - batches = self._get_number_of_mini_batches(length) + batches = _get_number_of_mini_batches(length, self.batch_size) for b in range(batches): - batch_X, batch_Y = self._get_batch(X, b), self._get_batch(Y, b) - self._save_to_pickle(X=batch_X, Y=batch_Y, index=index) + batch_X, batch_Y = _get_batch(X, b, self.batch_size), _get_batch(Y, b, self.batch_size) + _save_to_pickle(self._path, X=batch_X, Y=batch_Y, index=index) index += 1 if (batches * self.batch_size) < length: # keep remaining to concatenate with next data element - remaining = (self._get_batch(X, batches), self._get_batch(Y, batches)) + remaining = (_get_batch(X, batches, self.batch_size), _get_batch(Y, batches, self.batch_size)) else: remaining = None if remaining is not None: # add remaining as smaller batch - self._save_to_pickle(X=remaining[0], Y=remaining[1], index=index) + _save_to_pickle(self._path, X=remaining[0], Y=remaining[1], index=index) index += 1 self.indexes = np.arange(0, index).tolist() - @TimeTrackingWrapper def _prepare_batches_parallel(self, use_multiprocessing=False, max_process=1) -> None: + """ + Prepare all batches as locally stored files. + + Walk through all elements of collection and split (or merge) data according to the batch size. Too long data + sets are divided into multiple batches. Not fully filled batches are retained together with remains from the + next collection elements. These retained data are concatenated and also split into batches. If data are still + remaining afterwards, they are saved as final smaller batch. All batches are enumerated by a running index + starting at 0. A list with all batch numbers is stored in class's parameter indexes. This method can either + use a serial approach or use multiprocessing to decrease computational time. + """ index = 0 remaining = [] mod_rank = self._get_model_rank() @@ -212,77 +206,13 @@ class KerasIterator(keras.utils.Sequence): remaining = f_proc((X, Y), self.upsampling, mod_rank, self.batch_size, self._path, index) index += batches if remaining is not None: - save_to_pickle(self._path, X=remaining[0], Y=remaining[1], index=index) - index += 1 - self.indexes = np.arange(0, index).tolist() - logging.warning(f"hightst index is {index}") - if pool is not None: - pool.join() - - @TimeTrackingWrapper - def _prepare_batches(self, use_multiprocessing=False, max_process=1) -> None: - """ - Prepare all batches as locally stored files. - - Walk through all elements of collection and split (or merge) data according to the batch size. Too long data - sets are divided into multiple batches. Not fully filled batches are merged with data from the next collection - element. If data is remaining after the last element, it is saved as smaller batch. All batches are enumerated - beginning from 0. A list with all batch numbers is stored in class's parameter indexes. - """ - index = 0 - remaining = None - mod_rank = self._get_model_rank() - # max_process = 12 - n_process = min([psutil.cpu_count(logical=False), len(self._collection), max_process]) # use only physical cpus - if n_process > 1 and use_multiprocessing is True: # parallel solution - pool = multiprocessing.Pool(n_process) - else: - pool = None - for data in self._collection: - logging.debug(f"prepare batches for {str(data)}") - X, _Y = data.get_data(upsampling=self.upsampling) - Y = [_Y[0] for _ in range(mod_rank)] - if self.upsampling: - X, Y = self._permute_data(X, Y) - if remaining is not None: - X, Y = self._concatenate(X, remaining[0]), self._concatenate(Y, remaining[1]) - length = X[0].shape[0] - batches = self._get_number_of_mini_batches(length) - output = [] - for b in range(batches): - if pool is None: - output.append(f_proc_keras_gen(X, Y, b, self.batch_size, index, self._path)) - else: - output.append(pool.apply_async(f_proc_keras_gen, args=(X, Y, b, self.batch_size, index, self._path))) + _save_to_pickle(self._path, X=remaining[0], Y=remaining[1], index=index) index += 1 - if pool is not None: - [p.get() for p in output] - if (batches * self.batch_size) < length: # keep remaining to concatenate with next data element - remaining = (get_batch(X, batches, self.batch_size), get_batch(Y, batches, self.batch_size)) - # remaining = (self._get_batch(X, batches), self._get_batch(Y, batches)) - else: - remaining = None - if remaining is not None: # add remaining as smaller batch - # self._save_to_pickle(X=remaining[0], Y=remaining[1], index=index) - save_to_pickle(self._path, X=remaining[0], Y=remaining[1], index=index) - index += 1 self.indexes = np.arange(0, index).tolist() logging.warning(f"hightst index is {index}") if pool is not None: - pool.close() pool.join() - def _save_to_pickle(self, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None: - """Save data as pickle file with variables X and Y and given index as <index>.pickle .""" - data = {"X": X, "Y": Y} - file = self._path % index - with open(file, "wb") as f: - dill.dump(data, f) - - def _get_number_of_mini_batches(self, number_of_samples: int) -> int: - """Return number of mini batches as the floored ration of number of samples to batch size.""" - return math.floor(number_of_samples / self.batch_size) - @staticmethod def _cleanup_path(path: str, create_new: bool = True) -> None: """First remove existing path, second create empty path if enabled.""" @@ -297,12 +227,7 @@ class KerasIterator(keras.utils.Sequence): np.random.shuffle(self.indexes) -def f_proc_keras_gen(X, Y, batch_number, batch_size, index, path): - batch_X, batch_Y = get_batch(X, batch_number, batch_size), get_batch(Y, batch_number, batch_size) - save_to_pickle(path, X=batch_X, Y=batch_Y, index=index) - - -def save_to_pickle(path, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None: +def _save_to_pickle(path, X: List[np.ndarray], Y: List[np.ndarray], index: int) -> None: """Save data as pickle file with variables X and Y and given index as <index>.pickle .""" data = {"X": X, "Y": Y} file = path % index @@ -310,7 +235,7 @@ def save_to_pickle(path, X: List[np.ndarray], Y: List[np.ndarray], index: int) - dill.dump(data, f) -def get_batch(data_list: List[np.ndarray], b: int, batch_size: int) -> List[np.ndarray]: +def _get_batch(data_list: List[np.ndarray], b: int, batch_size: int) -> List[np.ndarray]: """Get batch according to batch size from data list.""" return list(map(lambda data: data[b * batch_size:(b + 1) * batch_size, ...], data_list)) @@ -321,6 +246,7 @@ def _permute_data(X, Y): Y = list(map(lambda x: x[p], Y)) return X, Y + def _get_number_of_mini_batches(number_of_samples: int, batch_size: int) -> int: """Return number of mini batches as the floored ration of number of samples to batch size.""" return math.floor(number_of_samples / batch_size) @@ -337,10 +263,11 @@ def f_proc(data, upsampling, mod_rank, batch_size, _path, index): length = X[0].shape[0] batches = _get_number_of_mini_batches(length, batch_size) for b in range(batches): - f_proc_keras_gen(X, Y, b, batch_size, index, _path) + batch_X, batch_Y = _get_batch(X, b, batch_size), _get_batch(Y, b, batch_size) + _save_to_pickle(_path, X=batch_X, Y=batch_Y, index=index) index += 1 if (batches * batch_size) < length: # keep remaining to concatenate with next data element - remaining = (get_batch(X, batches, batch_size), get_batch(Y, batches, batch_size)) + remaining = (_get_batch(X, batches, batch_size), _get_batch(Y, batches, batch_size)) else: remaining = None return remaining diff --git a/mlair/run_modules/training.py b/mlair/run_modules/training.py index e5959efb65e442a01e54a4d419c27c9f5b9d32d7..d23d68a823c03a82f88591df5c66e762889f8c93 100644 --- a/mlair/run_modules/training.py +++ b/mlair/run_modules/training.py @@ -109,10 +109,11 @@ class Training(RunEnvironment): :param mode: name of set, should be from ["train", "val", "test"] """ collection = self.data_store.get("data_collection", mode) - kwargs = self.data_store.create_args_dict(["upsampling", "shuffle_batches", "batch_path", "use_multiprocessing", "max_number_multiprocessing"], scope=mode) - + kwargs = self.data_store.create_args_dict(["upsampling", "shuffle_batches", "batch_path", "use_multiprocessing", + "max_number_multiprocessing"], scope=mode) setattr(self, f"{mode}_set", KerasIterator(collection, self.batch_size, model=self.model, name=mode, **kwargs)) + @TimeTrackingWrapper def set_generators(self) -> None: """ Set all generators for training, validation, and testing subsets.