diff --git a/src/data_handling/__init__.py b/src/data_handling/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/data_handling/data_distributor.py b/src/data_handling/data_distributor.py new file mode 100644 index 0000000000000000000000000000000000000000..33550cbf5029320e2f9fe26687eb5022258cf516 --- /dev/null +++ b/src/data_handling/data_distributor.py @@ -0,0 +1,55 @@ +import math + +import keras +import numpy as np + + +class Distributor(keras.utils.Sequence): + + def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256, + fit_call: bool = True): + self.generator = generator + self.model = model + self.batch_size = batch_size + self.fit_call = fit_call + + def _get_model_rank(self): + mod_out = self.model.output_shape + if isinstance(mod_out, tuple): + # only one output branch: (None, ahead) + mod_rank = 1 + elif isinstance(mod_out, list): + # multiple output branches, e.g.: [(None, ahead), (None, ahead)] + mod_rank = len(mod_out) + else: # pragma: no branch + raise TypeError("model output shape must either be tuple or list.") + return mod_rank + + def _get_number_of_mini_batches(self, values): + return math.ceil(values[0].shape[0] / self.batch_size) + + def distribute_on_batches(self, fit_call=True): + while True: + for k, v in enumerate(self.generator): + # get rank of output + mod_rank = self._get_model_rank() + # get number of mini batches + num_mini_batches = self._get_number_of_mini_batches(v) + x_total = np.copy(v[0]) + y_total = np.copy(v[1]) + for prev, curr in enumerate(range(1, num_mini_batches+1)): + x = x_total[prev*self.batch_size:curr*self.batch_size, ...] + y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)] + if x is not None: + yield (x, y) + if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call: + raise StopIteration + + def __len__(self): + if self.batch_size > 1: + num_batch = 0 + for _ in self.distribute_on_batches(fit_call=False): + num_batch += 1 + else: + num_batch = len(self.generator) + return num_batch diff --git a/src/data_generator.py b/src/data_handling/data_generator.py similarity index 99% rename from src/data_generator.py rename to src/data_handling/data_generator.py index 4e7dda9363c226c5fa92d03f1dbae6470e48d496..1de0ab2092b46dfca6281963b260b1fc6bc65387 100644 --- a/src/data_generator.py +++ b/src/data_handling/data_generator.py @@ -3,7 +3,7 @@ __date__ = '2019-11-07' import keras from src import helpers -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep import os from typing import Union, List, Tuple import xarray as xr diff --git a/src/data_preparation.py b/src/data_handling/data_preparation.py similarity index 100% rename from src/data_preparation.py rename to src/data_handling/data_preparation.py diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py index 764613ea4558bfdef4cbada668f16608f81d5f95..8fad9d1bf756baf830c236f16102878ba83515c2 100644 --- a/src/modules/pre_processing.py +++ b/src/modules/pre_processing.py @@ -3,12 +3,11 @@ __date__ = '2019-11-25' import logging -from typing import Any, Tuple, Dict, List +from typing import Tuple, Dict, List -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.helpers import TimeTracking from src.modules.run_environment import RunEnvironment -from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope from src.join import EmptyQueryResult diff --git a/src/modules/training.py b/src/modules/training.py index 2f61e35d536b18f5511b9bf457c5a78f74784a21..866e9405acec35d4602a1ca6b079fdc53a05b71f 100644 --- a/src/modules/training.py +++ b/src/modules/training.py @@ -1,13 +1,11 @@ + __author__ = "Lukas Leufen, Felix Kleinert" __date__ = '2019-12-05' - -import keras import logging -import numpy as np -import math from src.modules.run_environment import RunEnvironment +from src.data_handling.data_distributor import Distributor class Training(RunEnvironment): @@ -48,52 +46,3 @@ class Training(RunEnvironment): callbacks=[self.checkpoint, self.lr_sc]) -class Distributor(keras.utils.Sequence): - - def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256, - fit_call: bool = True): - self.generator = generator - self.model = model - self.batch_size = batch_size - self.fit_call = fit_call - - def _get_model_rank(self): - mod_out = self.model.output_shape - if isinstance(mod_out, tuple): - # only one output branch: (None, ahead) - mod_rank = 1 - elif isinstance(mod_out, list): - # multiple output branches, e.g.: [(None, ahead), (None, ahead)] - mod_rank = len(mod_out) - else: # pragma: no branch - raise TypeError("model output shape must either be tuple or list.") - return mod_rank - - def _get_number_of_mini_batches(self, values): - return math.ceil(values[0].shape[0] / self.batch_size) - - def distribute_on_batches(self, fit_call=True): - while True: - for k, v in enumerate(self.generator): - # get rank of output - mod_rank = self._get_model_rank() - # get number of mini batches - num_mini_batches = self._get_number_of_mini_batches(v) - x_total = np.copy(v[0]) - y_total = np.copy(v[1]) - for prev, curr in enumerate(range(1, num_mini_batches+1)): - x = x_total[prev*self.batch_size:curr*self.batch_size, ...] - y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)] - if x is not None: - yield (x, y) - if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call: - raise StopIteration - - def __len__(self): - if self.batch_size > 1: - num_batch = 0 - for _ in self.distribute_on_batches(fit_call=False): - num_batch += 1 - else: - num_batch = len(self.generator) - return num_batch diff --git a/test/test_data_handling/test_data_distributor.py b/test/test_data_handling/test_data_distributor.py new file mode 100644 index 0000000000000000000000000000000000000000..894c086eccb0683cb4480be761503b242bb788bb --- /dev/null +++ b/test/test_data_handling/test_data_distributor.py @@ -0,0 +1,68 @@ +import math +import os + +import keras +import numpy as np +import pytest + +from src.data_handling.data_distributor import Distributor +from src.data_handling.data_generator import DataGenerator +from test.test_modules.test_training import my_test_model + + +class TestDistributor: + + @pytest.fixture + def generator(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], + 'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def generator_two_stations(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'], + ['o3', 'temp'], 'datetime', 'variables', 'o3', + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def model(self): + return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False) + + @pytest.fixture + def distributor(self, generator, model): + return Distributor(generator, model) + + def test_init_defaults(self, distributor): + assert distributor.batch_size == 256 + assert distributor.fit_call is True + + def test_get_model_rank(self, distributor): + assert distributor._get_model_rank() == 1 + distributor.model = my_test_model(keras.layers.PReLU, 5, 3, 0.1, True) + assert distributor._get_model_rank() == 2 + distributor.model = 1 + + def test_get_number_of_mini_batches(self, distributor): + values = np.zeros((2, 2311, 19)) + assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size) + + def test_distribute_on_batches(self, generator_two_stations, model): + d = Distributor(generator_two_stations, model) + for e in d.distribute_on_batches(fit_call=False): + assert e[0].shape[0] <= d.batch_size + elements = [] + for i, e in enumerate(d.distribute_on_batches()): + if i < len(d): + elements.append(e[0]) + elif i == 2*len(d): # check if all elements are repeated + assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None + else: # break when 3rd iteration starts (is called as infinite loop) + break + + def test_len(self, distributor): + assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256) + + def test_len_two_stations(self, generator_two_stations, model): + gen = generator_two_stations + d = Distributor(gen, model) + expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256) + assert len(d) == expected \ No newline at end of file diff --git a/test/test_data_generator.py b/test/test_data_handling/test_data_generator.py similarity index 98% rename from test/test_data_generator.py rename to test/test_data_handling/test_data_generator.py index 6801e064be804da0368520ab0ef81bdba8bca2d3..b08145e987278a7ab19a3b4e3567541e91f432c5 100644 --- a/test/test_data_generator.py +++ b/test/test_data_handling/test_data_generator.py @@ -1,6 +1,6 @@ import pytest import os -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator class TestDataGenerator: diff --git a/test/test_data_preparation.py b/test/test_data_handling/test_data_preparation.py similarity index 99% rename from test/test_data_preparation.py rename to test/test_data_handling/test_data_preparation.py index 30f93e6d734885252d2c7a438d6065aa680f32f8..32d1937ae0fe217b2adaa5a07bf0051b6098af6d 100644 --- a/test/test_data_preparation.py +++ b/test/test_data_handling/test_data_preparation.py @@ -1,8 +1,7 @@ import pytest import os -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep from src.join import EmptyQueryResult -import logging import numpy as np import xarray as xr import datetime as dt diff --git a/test/test_modules/test_model_setup.py b/test/test_modules/test_model_setup.py index 7f8c7a051542ff7fc317c0c92454c28f1d0d70b5..5c8223eefebf303733488f08a519208627a2bd91 100644 --- a/test/test_modules/test_model_setup.py +++ b/test/test_modules/test_model_setup.py @@ -1,11 +1,10 @@ -import logging import pytest import os import keras from src.modules.model_setup import ModelSetup from src.modules.run_environment import RunEnvironment -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator class TestModelSetup: diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py index 13abe62a2b9199ad8d92528ff5363bd54f1be221..ca5502e2bf579bd2bf0082433783ec15b72baf4c 100644 --- a/test/test_modules/test_pre_processing.py +++ b/test/test_modules/test_pre_processing.py @@ -1,11 +1,10 @@ import logging import pytest -import time from src.helpers import PyTestRegex from src.modules.experiment_setup import ExperimentSetup from src.modules.pre_processing import PreProcessing, DEFAULT_ARGS_LIST, DEFAULT_KWARGS_LIST -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.datastore import NameNotFoundInScope from src.modules.run_environment import RunEnvironment diff --git a/test/test_modules/test_training.py b/test/test_modules/test_training.py index d37d3e466003f93a0e9497dabcf76e4e74c19797..590bb38018835243163d1efb1e0634fd4d6e8b2e 100644 --- a/test/test_modules/test_training.py +++ b/test/test_modules/test_training.py @@ -1,11 +1,5 @@ -import pytest -import os import keras -import math -import numpy as np -from src.modules.training import Distributor -from src.data_generator import DataGenerator from src.inception_model import InceptionModelBase from src.flatten import flatten_tail @@ -25,62 +19,3 @@ def my_test_model(activation, window_history_size, channels, dropout_rate, add_m X_in = keras.layers.Dropout(dropout_rate)(X_in) out.append(flatten_tail(X_in, 'Main', activation=activation)) return keras.Model(inputs=X_input, outputs=out) - - -class TestDistributor: - - @pytest.fixture - def generator(self): - return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], - 'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - - @pytest.fixture - def generator_two_stations(self): - return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'], - ['o3', 'temp'], 'datetime', 'variables', 'o3', - statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - - @pytest.fixture - def model(self): - return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False) - - @pytest.fixture - def distributor(self, generator, model): - return Distributor(generator, model) - - def test_init_defaults(self, distributor): - assert distributor.batch_size == 256 - assert distributor.fit_call is True - - def test_get_model_rank(self, distributor): - assert distributor._get_model_rank() == 1 - distributor.model = my_test_model(keras.layers.PReLU, 5, 3, 0.1, True) - assert distributor._get_model_rank() == 2 - distributor.model = 1 - - def test_get_number_of_mini_batches(self, distributor): - values = np.zeros((2, 2311, 19)) - assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size) - - def test_distribute_on_batches(self, generator_two_stations, model): - d = Distributor(generator_two_stations, model) - for e in d.distribute_on_batches(fit_call=False): - assert e[0].shape[0] <= d.batch_size - elements = [] - for i, e in enumerate(d.distribute_on_batches()): - if i < len(d): - elements.append(e[0]) - elif i == 2*len(d): # check if all elements are repeated - assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None - else: # break when 3rd iteration starts (is called as infinite loop) - break - - def test_len(self, distributor): - assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256) - - def test_len_two_stations(self, generator_two_stations, model): - gen = generator_two_stations - d = Distributor(gen, model) - expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256) - assert len(d) == expected -