From 8fda782b0156ae6e1e069dd35aa02eb26075e808 Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Fri, 6 Dec 2019 12:12:13 +0100 Subject: [PATCH] restructured some modules, introduced "data_handling" for preparator, generator and distributor --- src/data_handling/__init__.py | 0 src/data_handling/data_distributor.py | 55 +++++++++++++++ src/{ => data_handling}/data_generator.py | 2 +- src/{ => data_handling}/data_preparation.py | 0 src/modules/pre_processing.py | 5 +- src/modules/training.py | 55 +-------------- .../test_data_distributor.py | 68 +++++++++++++++++++ .../test_data_generator.py | 2 +- .../test_data_preparation.py | 3 +- test/test_modules/test_model_setup.py | 3 +- test/test_modules/test_pre_processing.py | 3 +- test/test_modules/test_training.py | 65 ------------------ 12 files changed, 132 insertions(+), 129 deletions(-) create mode 100644 src/data_handling/__init__.py create mode 100644 src/data_handling/data_distributor.py rename src/{ => data_handling}/data_generator.py (99%) rename src/{ => data_handling}/data_preparation.py (100%) create mode 100644 test/test_data_handling/test_data_distributor.py rename test/{ => test_data_handling}/test_data_generator.py (98%) rename test/{ => test_data_handling}/test_data_preparation.py (99%) diff --git a/src/data_handling/__init__.py b/src/data_handling/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/data_handling/data_distributor.py b/src/data_handling/data_distributor.py new file mode 100644 index 00000000..33550cbf --- /dev/null +++ b/src/data_handling/data_distributor.py @@ -0,0 +1,55 @@ +import math + +import keras +import numpy as np + + +class Distributor(keras.utils.Sequence): + + def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256, + fit_call: bool = True): + self.generator = generator + self.model = model + self.batch_size = batch_size + self.fit_call = fit_call + + def _get_model_rank(self): + mod_out = self.model.output_shape + if isinstance(mod_out, tuple): + # only one output branch: (None, ahead) + mod_rank = 1 + elif isinstance(mod_out, list): + # multiple output branches, e.g.: [(None, ahead), (None, ahead)] + mod_rank = len(mod_out) + else: # pragma: no branch + raise TypeError("model output shape must either be tuple or list.") + return mod_rank + + def _get_number_of_mini_batches(self, values): + return math.ceil(values[0].shape[0] / self.batch_size) + + def distribute_on_batches(self, fit_call=True): + while True: + for k, v in enumerate(self.generator): + # get rank of output + mod_rank = self._get_model_rank() + # get number of mini batches + num_mini_batches = self._get_number_of_mini_batches(v) + x_total = np.copy(v[0]) + y_total = np.copy(v[1]) + for prev, curr in enumerate(range(1, num_mini_batches+1)): + x = x_total[prev*self.batch_size:curr*self.batch_size, ...] + y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)] + if x is not None: + yield (x, y) + if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call: + raise StopIteration + + def __len__(self): + if self.batch_size > 1: + num_batch = 0 + for _ in self.distribute_on_batches(fit_call=False): + num_batch += 1 + else: + num_batch = len(self.generator) + return num_batch diff --git a/src/data_generator.py b/src/data_handling/data_generator.py similarity index 99% rename from src/data_generator.py rename to src/data_handling/data_generator.py index 4e7dda93..1de0ab20 100644 --- a/src/data_generator.py +++ b/src/data_handling/data_generator.py @@ -3,7 +3,7 @@ __date__ = '2019-11-07' import keras from src import helpers -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep import os from typing import Union, List, Tuple import xarray as xr diff --git a/src/data_preparation.py b/src/data_handling/data_preparation.py similarity index 100% rename from src/data_preparation.py rename to src/data_handling/data_preparation.py diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py index 764613ea..8fad9d1b 100644 --- a/src/modules/pre_processing.py +++ b/src/modules/pre_processing.py @@ -3,12 +3,11 @@ __date__ = '2019-11-25' import logging -from typing import Any, Tuple, Dict, List +from typing import Tuple, Dict, List -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.helpers import TimeTracking from src.modules.run_environment import RunEnvironment -from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope from src.join import EmptyQueryResult diff --git a/src/modules/training.py b/src/modules/training.py index 2f61e35d..866e9405 100644 --- a/src/modules/training.py +++ b/src/modules/training.py @@ -1,13 +1,11 @@ + __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-05' - -import keras import logging -import numpy as np -import math from src.modules.run_environment import RunEnvironment +from src.data_handling.data_distributor import Distributor class Training(RunEnvironment): @@ -48,52 +46,3 @@ class Training(RunEnvironment): callbacks=[self.checkpoint, self.lr_sc]) -class Distributor(keras.utils.Sequence): - - def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256, - fit_call: bool = True): - self.generator = generator - self.model = model - self.batch_size = batch_size - self.fit_call = fit_call - - def _get_model_rank(self): - mod_out = self.model.output_shape - if isinstance(mod_out, tuple): - # only one output branch: (None, ahead) - mod_rank = 1 - elif isinstance(mod_out, list): - # multiple output branches, e.g.: [(None, ahead), (None, ahead)] - mod_rank = len(mod_out) - else: # pragma: no branch - raise TypeError("model output shape must either be tuple or list.") - return mod_rank - - def _get_number_of_mini_batches(self, values): - return math.ceil(values[0].shape[0] / self.batch_size) - - def distribute_on_batches(self, fit_call=True): - while True: - for k, v in enumerate(self.generator): - # get rank of output - mod_rank = self._get_model_rank() - # get number of mini batches - num_mini_batches = self._get_number_of_mini_batches(v) - x_total = np.copy(v[0]) - y_total = np.copy(v[1]) - for prev, curr in enumerate(range(1, num_mini_batches+1)): - x = x_total[prev*self.batch_size:curr*self.batch_size, ...] - y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)] - if x is not None: - yield (x, y) - if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call: - raise StopIteration - - def __len__(self): - if self.batch_size > 1: - num_batch = 0 - for _ in self.distribute_on_batches(fit_call=False): - num_batch += 1 - else: - num_batch = len(self.generator) - return num_batch diff --git a/test/test_data_handling/test_data_distributor.py b/test/test_data_handling/test_data_distributor.py new file mode 100644 index 00000000..894c086e --- /dev/null +++ b/test/test_data_handling/test_data_distributor.py @@ -0,0 +1,68 @@ +import math +import os + +import keras +import numpy as np +import pytest + +from src.data_handling.data_distributor import Distributor +from src.data_handling.data_generator import DataGenerator +from test.test_modules.test_training import my_test_model + + +class TestDistributor: + + @pytest.fixture + def generator(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], + 'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def generator_two_stations(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'], + ['o3', 'temp'], 'datetime', 'variables', 'o3', + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def model(self): + return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False) + + @pytest.fixture + def distributor(self, generator, model): + return Distributor(generator, model) + + def test_init_defaults(self, distributor): + assert distributor.batch_size == 256 + assert distributor.fit_call is True + + def test_get_model_rank(self, distributor): + assert distributor._get_model_rank() == 1 + distributor.model = my_test_model(keras.layers.PReLU, 5, 3, 0.1, True) + assert distributor._get_model_rank() == 2 + distributor.model = 1 + + def test_get_number_of_mini_batches(self, distributor): + values = np.zeros((2, 2311, 19)) + assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size) + + def test_distribute_on_batches(self, generator_two_stations, model): + d = Distributor(generator_two_stations, model) + for e in d.distribute_on_batches(fit_call=False): + assert e[0].shape[0] <= d.batch_size + elements = [] + for i, e in enumerate(d.distribute_on_batches()): + if i < len(d): + elements.append(e[0]) + elif i == 2*len(d): # check if all elements are repeated + assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None + else: # break when 3rd iteration starts (is called as infinite loop) + break + + def test_len(self, distributor): + assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256) + + def test_len_two_stations(self, generator_two_stations, model): + gen = generator_two_stations + d = Distributor(gen, model) + expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256) + assert len(d) == expected \ No newline at end of file diff --git a/test/test_data_generator.py b/test/test_data_handling/test_data_generator.py similarity index 98% rename from test/test_data_generator.py rename to test/test_data_handling/test_data_generator.py index 6801e064..b08145e9 100644 --- a/test/test_data_generator.py +++ b/test/test_data_handling/test_data_generator.py @@ -1,6 +1,6 @@ import pytest import os -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator class TestDataGenerator: diff --git a/test/test_data_preparation.py b/test/test_data_handling/test_data_preparation.py similarity index 99% rename from test/test_data_preparation.py rename to test/test_data_handling/test_data_preparation.py index 30f93e6d..32d1937a 100644 --- a/test/test_data_preparation.py +++ b/test/test_data_handling/test_data_preparation.py @@ -1,8 +1,7 @@ import pytest import os -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep from src.join import EmptyQueryResult -import logging import numpy as np import xarray as xr import datetime as dt diff --git a/test/test_modules/test_model_setup.py b/test/test_modules/test_model_setup.py index 7f8c7a05..5c8223ee 100644 --- a/test/test_modules/test_model_setup.py +++ b/test/test_modules/test_model_setup.py @@ -1,11 +1,10 @@ -import logging import pytest import os import keras from src.modules.model_setup import ModelSetup from src.modules.run_environment import RunEnvironment -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator class TestModelSetup: diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py index 13abe62a..ca5502e2 100644 --- a/test/test_modules/test_pre_processing.py +++ b/test/test_modules/test_pre_processing.py @@ -1,11 +1,10 @@ import logging import pytest -import time from src.helpers import PyTestRegex from src.modules.experiment_setup import ExperimentSetup from src.modules.pre_processing import PreProcessing, DEFAULT_ARGS_LIST, DEFAULT_KWARGS_LIST -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.datastore import NameNotFoundInScope from src.modules.run_environment import RunEnvironment diff --git a/test/test_modules/test_training.py b/test/test_modules/test_training.py index d37d3e46..590bb380 100644 --- a/test/test_modules/test_training.py +++ b/test/test_modules/test_training.py @@ -1,11 +1,5 @@ -import pytest -import os import keras -import math -import numpy as np -from src.modules.training import Distributor -from src.data_generator import DataGenerator from src.inception_model import InceptionModelBase from src.flatten import flatten_tail @@ -25,62 +19,3 @@ def my_test_model(activation, window_history_size, channels, dropout_rate, add_m X_in = keras.layers.Dropout(dropout_rate)(X_in) out.append(flatten_tail(X_in, 'Main', activation=activation)) return keras.Model(inputs=X_input, outputs=out) - - -class TestDistributor: - - @pytest.fixture - def generator(self): - return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], - 'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - - @pytest.fixture - def generator_two_stations(self): - return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'], - ['o3', 'temp'], 'datetime', 'variables', 'o3', - statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - - @pytest.fixture - def model(self): - return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False) - - @pytest.fixture - def distributor(self, generator, model): - return Distributor(generator, model) - - def test_init_defaults(self, distributor): - assert distributor.batch_size == 256 - assert distributor.fit_call is True - - def test_get_model_rank(self, distributor): - assert distributor._get_model_rank() == 1 - distributor.model = my_test_model(keras.layers.PReLU, 5, 3, 0.1, True) - assert distributor._get_model_rank() == 2 - distributor.model = 1 - - def test_get_number_of_mini_batches(self, distributor): - values = np.zeros((2, 2311, 19)) - assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size) - - def test_distribute_on_batches(self, generator_two_stations, model): - d = Distributor(generator_two_stations, model) - for e in d.distribute_on_batches(fit_call=False): - assert e[0].shape[0] <= d.batch_size - elements = [] - for i, e in enumerate(d.distribute_on_batches()): - if i < len(d): - elements.append(e[0]) - elif i == 2*len(d): # check if all elements are repeated - assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None - else: # break when 3rd iteration starts (is called as infinite loop) - break - - def test_len(self, distributor): - assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256) - - def test_len_two_stations(self, generator_two_stations, model): - gen = generator_two_stations - d = Distributor(gen, model) - expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256) - assert len(d) == expected - -- GitLab