diff --git a/.gitignore b/.gitignore index 11b7c159fdc450144f4103db66af1eabe42791a0..cec17a77983bc017ef057b020600d15922e61f23 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,4 @@ htmlcov/ /test/test_modules/data/ report.html /TestExperiment/ +/testrun_network/ diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..92d2159c3b3a3efd7d0c0bfb5bf6bb058697d79c --- /dev/null +++ b/conftest.py @@ -0,0 +1,25 @@ +import os +import shutil + + +def pytest_runtest_teardown(item, nextitem): + """ + Teardown method to clean up folder creations during testing. This method is called after each test, but performs + deletions only after an entire test class was executed. + :param item: tested item + :param nextitem: next item (could be None, if no following test is available) + """ + if nextitem is None or item.cls != nextitem.cls: + # clean up all TestExperiment and data folder that have been created during testing + rel_path = os.path.relpath(item.fspath.dirname, os.path.abspath(__file__)) + path = os.path.dirname(__file__) + for stage in filter(None, rel_path.replace("..", ".").split("/")): + path = os.path.abspath(os.path.join(path, stage)) + list_dir = os.listdir(path) + if "data" in list_dir and path != os.path.dirname(__file__): # do not delete data folder in src + shutil.rmtree(os.path.join(path, "data"), ignore_errors=True) + if "TestExperiment" in list_dir: + shutil.rmtree(os.path.join(path, "TestExperiment"), ignore_errors=True) + else: + pass # nothing to do if next test is from same test class + diff --git a/run.py b/run.py index ea8c04ebde02a80b899a356eb0f7794055abe2d6..e45b2dd6dc3da47c1febe46a387a45014db2772d 100644 --- a/run.py +++ b/run.py @@ -9,14 +9,15 @@ from src.modules.experiment_setup import ExperimentSetup from src.modules.run_environment import RunEnvironment from src.modules.pre_processing import PreProcessing from src.modules.model_setup import ModelSetup -from src.modules.modules import Training, PostProcessing +from src.modules.training import Training +from src.modules.modules import PostProcessing def main(parser_args): with RunEnvironment(): ExperimentSetup(parser_args, stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], - station_type='background') + station_type='background', trainable=True) PreProcessing() ModelSetup() @@ -32,8 +33,8 @@ if __name__ == "__main__": logging.basicConfig(format=formatter, level=logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--experiment_date', metavar='--exp_date', type=str, nargs=1, default=None, + parser.add_argument('--experiment_date', metavar='--exp_date', type=str, default=None, help="set experiment date as string") - args = parser.parse_args() + args = parser.parse_args(["--experiment_date", "testrun"]) main(args) diff --git a/src/data_handling/__init__.py b/src/data_handling/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/data_handling/data_distributor.py b/src/data_handling/data_distributor.py new file mode 100644 index 0000000000000000000000000000000000000000..77f83536db5eaed3545d609e1d33a042c7ad23dd --- /dev/null +++ b/src/data_handling/data_distributor.py @@ -0,0 +1,53 @@ +from __future__ import generator_stop +import math + +import keras +import numpy as np + + +class Distributor(keras.utils.Sequence): + + def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256, + fit_call: bool = True): + self.generator = generator + self.model = model + self.batch_size = batch_size + self.fit_call = fit_call + + def _get_model_rank(self): + mod_out = self.model.output_shape + if isinstance(mod_out, tuple): + # only one output branch: (None, ahead) + mod_rank = 1 + elif isinstance(mod_out, list): + # multiple output branches, e.g.: [(None, ahead), (None, ahead)] + mod_rank = len(mod_out) + else: # pragma: no cover + raise TypeError("model output shape must either be tuple or list.") + return mod_rank + + def _get_number_of_mini_batches(self, values): + return math.ceil(values[0].shape[0] / self.batch_size) + + def distribute_on_batches(self, fit_call=True): + while True: + for k, v in enumerate(self.generator): + # get rank of output + mod_rank = self._get_model_rank() + # get number of mini batches + num_mini_batches = self._get_number_of_mini_batches(v) + x_total = np.copy(v[0]) + y_total = np.copy(v[1]) + for prev, curr in enumerate(range(1, num_mini_batches+1)): + x = x_total[prev*self.batch_size:curr*self.batch_size, ...] + y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)] + if x is not None: + yield (x, y) + if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call: + return + + def __len__(self): + num_batch = 0 + for _ in self.distribute_on_batches(fit_call=False): + num_batch += 1 + return num_batch diff --git a/src/data_generator.py b/src/data_handling/data_generator.py similarity index 99% rename from src/data_generator.py rename to src/data_handling/data_generator.py index 4e7dda9363c226c5fa92d03f1dbae6470e48d496..1de0ab2092b46dfca6281963b260b1fc6bc65387 100644 --- a/src/data_generator.py +++ b/src/data_handling/data_generator.py @@ -3,7 +3,7 @@ __date__ = '2019-11-07' import keras from src import helpers -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep import os from typing import Union, List, Tuple import xarray as xr diff --git a/src/data_preparation.py b/src/data_handling/data_preparation.py similarity index 100% rename from src/data_preparation.py rename to src/data_handling/data_preparation.py diff --git a/src/helpers.py b/src/helpers.py index 1b635a2e9cd00bfbb3650a5a7a8768378cfcdb6b..2ef776898e35a16b0bfd54b5984864c740dbf341 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -13,6 +13,7 @@ import numpy as np import os import time import socket +import datetime as dt def to_list(arg): @@ -116,7 +117,8 @@ class TimeTracking(object): return time.time() - self.start def __repr__(self): - return f"{round(self._duration(), 2)}s" + # return f"{round(self._duration(), 2)}s" + return f"{dt.timedelta(seconds=math.ceil(self._duration()))} (hh:mm:ss)" def run(self): self._start() @@ -176,11 +178,11 @@ def set_experiment_name(experiment_date=None, experiment_path=None): if experiment_date is None: experiment_name = "TestExperiment" else: - experiment_name = f"{experiment_date}_network/" + experiment_name = f"{experiment_date}_network" if experiment_path is None: experiment_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", experiment_name)) else: - experiment_path = os.path.abspath(experiment_path) + experiment_path = os.path.join(os.path.abspath(experiment_path), experiment_name) return experiment_name, experiment_path diff --git a/src/modules/model_setup.py b/src/modules/model_setup.py index 925d83d733b8cb5715b515856e09e773737778a9..f6c25aff51372f84ede5cda0884fd7c603ffaa6b 100644 --- a/src/modules/model_setup.py +++ b/src/modules/model_setup.py @@ -3,7 +3,7 @@ __date__ = '2019-12-02' import keras -from keras import losses, layers +from keras import losses from keras.callbacks import ModelCheckpoint from keras.regularizers import l2 from keras.optimizers import Adam, SGD @@ -24,8 +24,10 @@ class ModelSetup(RunEnvironment): # create run framework super().__init__() self.model = None - self.model_name = self.data_store.get("experiment_name", "general") + "model-best.h5" + path = self.data_store.get("experiment_path", "general") + exp_name = self.data_store.get("experiment_name", "general") self.scope = "general.model" + self.checkpoint_name = os.path.join(path, f"{exp_name}_model-best.h5") self._run() def _run(self): @@ -56,25 +58,25 @@ class ModelSetup(RunEnvironment): self.data_store.put("model", self.model, self.scope) def _set_checkpoint(self): - checkpoint = ModelCheckpoint(self.model_name, verbose=1, monitor='val_loss', save_best_only=True, mode='auto') + checkpoint = ModelCheckpoint(self.checkpoint_name, verbose=1, monitor='val_loss', save_best_only=True, mode='auto') self.data_store.put("checkpoint", checkpoint, self.scope) def load_weights(self): try: - logging.debug('reload weights...') - self.model.load_weights(self.model_name) + self.model.load_weights(self.checkpoint_name) + logging.info('reload weights...') except OSError: - logging.debug('no weights to reload...') + logging.info('no weights to reload...') def build_model(self): args_list = ["activation", "window_history_size", "channels", "regularizer", "dropout_rate", "window_lead_time"] args = self.data_store.create_args_dict(args_list, self.scope) - self.model = my_model(**args) + self.model = my_little_model(**args) def plot_model(self): # pragma: no cover with tf.device("/cpu:0"): path = self.data_store.get("experiment_path", "general") - name = self.data_store.get("experiment_name", "general") + "model.pdf" + name = self.data_store.get("experiment_name", "general") + "_model.pdf" file_name = os.path.join(path, name) keras.utils.plot_model(self.model, to_file=file_name, show_shapes=True, show_layer_names=True) @@ -104,11 +106,11 @@ class ModelSetup(RunEnvironment): self.data_store.put("batch_size", int(256), self.scope) # activation - activation = layers.PReLU # ELU #LeakyReLU keras.activations.tanh # + activation = keras.layers.PReLU # ELU #LeakyReLU keras.activations.tanh # self.data_store.put("activation", activation, self.scope) # set los - loss_all = my_loss() + loss_all = my_little_loss() self.data_store.put("loss", loss_all, self.scope) @@ -119,6 +121,29 @@ def my_loss(): return loss_all +def my_little_loss(): + return losses.mean_squared_error + + +def my_little_model(activation, window_history_size, channels, regularizer, dropout_rate, window_lead_time): + + X_input = keras.layers.Input( + shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0 + X_in = keras.layers.Conv2D(32, (1, 1), padding='same', name='{}_Conv_1x1'.format("major"))(X_input) + X_in = activation(name='{}_conv_act'.format("major"))(X_in) + X_in = keras.layers.Flatten(name='{}'.format("major"))(X_in) + X_in = keras.layers.Dropout(dropout_rate, name='{}_Dropout_1'.format("major"))(X_in) + X_in = keras.layers.Dense(64, name='{}_Dense_64'.format("major"))(X_in) + X_in = activation()(X_in) + X_in = keras.layers.Dense(32, name='{}_Dense_32'.format("major"))(X_in) + X_in = activation()(X_in) + X_in = keras.layers.Dense(16, name='{}_Dense_16'.format("major"))(X_in) + X_in = activation()(X_in) + X_in = keras.layers.Dense(window_lead_time, name='{}_Dense'.format("major"))(X_in) + out_main = activation()(X_in) + return keras.Model(inputs=X_input, outputs=[out_main]) + + def my_model(activation, window_history_size, channels, regularizer, dropout_rate, window_lead_time): conv_settings_dict1 = { @@ -151,7 +176,7 @@ def my_model(activation, window_history_size, channels, regularizer, dropout_rat ########################################## inception_model = InceptionModelBase() - X_input = layers.Input(shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0 + X_input = keras.layers.Input(shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0 X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1, regularizer=regularizer, batch_normalisation=True) @@ -159,12 +184,12 @@ def my_model(activation, window_history_size, channels, regularizer, dropout_rat out_minor = flatten_tail(X_in, 'Minor_1', bound_weight=True, activation=activation, dropout_rate=dropout_rate, reduction_filter=4, first_dense=32, window_lead_time=window_lead_time) - X_in = layers.Dropout(dropout_rate)(X_in) + X_in = keras.layers.Dropout(dropout_rate)(X_in) X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=regularizer, batch_normalisation=True) - X_in = layers.Dropout(dropout_rate)(X_in) + X_in = keras.layers.Dropout(dropout_rate)(X_in) X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=regularizer, batch_normalisation=True) diff --git a/src/modules/pre_processing.py b/src/modules/pre_processing.py index 764613ea4558bfdef4cbada668f16608f81d5f95..8fad9d1bf756baf830c236f16102878ba83515c2 100644 --- a/src/modules/pre_processing.py +++ b/src/modules/pre_processing.py @@ -3,12 +3,11 @@ __date__ = '2019-11-25' import logging -from typing import Any, Tuple, Dict, List +from typing import Tuple, Dict, List -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.helpers import TimeTracking from src.modules.run_environment import RunEnvironment -from src.datastore import NameNotFoundInDataStore, NameNotFoundInScope from src.join import EmptyQueryResult diff --git a/src/modules/training.py b/src/modules/training.py new file mode 100644 index 0000000000000000000000000000000000000000..87dcf35ef6e7d0118ddd9e5f03f99d611da3a70c --- /dev/null +++ b/src/modules/training.py @@ -0,0 +1,125 @@ + +__author__ = "Lukas Leufen, Felix Kleinert" +__date__ = '2019-12-05' + +import logging +import os +import json +import keras + +from src.modules.run_environment import RunEnvironment +from src.data_handling.data_distributor import Distributor + + +class Training(RunEnvironment): + + def __init__(self): + super().__init__() + self.model = self.data_store.get("model", "general.model") + self.train_set = None + self.val_set = None + self.test_set = None + self.batch_size = self.data_store.get("batch_size", "general.model") + self.epochs = self.data_store.get("epochs", "general.model") + self.checkpoint = self.data_store.get("checkpoint", "general.model") + self.lr_sc = self.data_store.get("lr_decay", "general.model") + self.experiment_name = self.data_store.get("experiment_name", "general") + self._run() + + def _run(self) -> None: + """ + Perform training + 1) set_generators(): + set generators for training, validation and testing and distribute according to batch size + 2) make_predict_function(): + create predict function before distribution on multiple nodes (detailed information in method description) + 3) train(): + train model and save callbacks + 4) save_model(): + save best model from training as final model + """ + self.set_generators() + self.make_predict_function() + self.train() + self.save_model() + + def make_predict_function(self) -> None: + """ + Creates the predict function. Must be called before distributing. This is necessary, because tf will compile + the predict function just in the moment it is used the first time. This can cause problems, if the model is + distributed on different workers. To prevent this, the function is pre-compiled. See discussion @ + https://stackoverflow.com/questions/40850089/is-keras-thread-safe/43393252#43393252 + """ + self.model._make_predict_function() + + def _set_gen(self, mode: str) -> None: + """ + Set and distribute the generators for given mode regarding batch size + :param mode: name of set, should be from ["train", "val", "test"] + """ + gen = self.data_store.get("generator", f"general.{mode}") + setattr(self, f"{mode}_set", Distributor(gen, self.model, self.batch_size)) + + def set_generators(self) -> None: + """ + Set all generators for training, validation, and testing subsets. The called sub-method will automatically + distribute the data according to the batch size. The subsets can be accessed as class variables train_set, + val_set, and test_set . + """ + for mode in ["train", "val", "test"]: + self._set_gen(mode) + + def train(self) -> None: + """ + Perform training using keras fit_generator(). Callbacks are stored locally in the experiment directory. Best + model from training is saved for class variable model. + """ + logging.info(f"Train with {len(self.train_set)} mini batches.") + history = self.model.fit_generator(generator=self.train_set.distribute_on_batches(), + steps_per_epoch=len(self.train_set), + epochs=self.epochs, + verbose=2, + validation_data=self.val_set.distribute_on_batches(), + validation_steps=len(self.val_set), + callbacks=[self.checkpoint, self.lr_sc]) + self.save_callbacks(history) + self.load_best_model(self.checkpoint.filepath) + + def save_model(self) -> None: + """ + save model in local experiment directory. Model is named as <experiment_name>_my_model.h5 . + """ + path = self.data_store.get("experiment_path", "general") + name = f"{self.data_store.get('experiment_name', 'general')}_my_model.h5" + model_name = os.path.join(path, name) + logging.debug(f"save best model to {model_name}") + self.model.save(model_name) + + def load_best_model(self, name: str) -> None: + """ + Load model weights for model with name. Skip if no weights are available. + :param name: name of the model to load weights for + """ + logging.debug(f"load best model: {name}") + try: + self.model.load_weights(name) + logging.info('reload weights...') + except OSError: + logging.info('no weights to reload...') + + def save_callbacks(self, history: keras.callbacks.History) -> None: + """ + Save callbacks (history, learning rate) of training. + * history.history -> history.json + * lr_sc.lr -> history_lr.json + :param history: history object of training + """ + logging.debug("saving callbacks") + path = self.data_store.get("experiment_path", "general") + with open(os.path.join(path, "history.json"), "w") as f: + json.dump(history.history, f) + with open(os.path.join(path, "history_lr.json"), "w") as f: + json.dump(self.lr_sc.lr, f) + + + diff --git a/test/test_data_handling/test_data_distributor.py b/test/test_data_handling/test_data_distributor.py new file mode 100644 index 0000000000000000000000000000000000000000..cb51f20c8771ec49116731f02c7b462a62405394 --- /dev/null +++ b/test/test_data_handling/test_data_distributor.py @@ -0,0 +1,76 @@ +import math +import os +import shutil + +import keras +import numpy as np +import pytest + +from src.data_handling.data_distributor import Distributor +from src.data_handling.data_generator import DataGenerator +from test.test_modules.test_training import my_test_model + + +class TestDistributor: + + @pytest.fixture + def generator(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], + 'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def generator_two_stations(self): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'], + ['o3', 'temp'], 'datetime', 'variables', 'o3', + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def model(self): + return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False) + + @pytest.fixture + def model_with_minor_branch(self): + return my_test_model(keras.layers.PReLU, 5, 3, 0.1, True) + + @pytest.fixture + def distributor(self, generator, model): + return Distributor(generator, model) + + def test_init_defaults(self, distributor): + assert distributor.batch_size == 256 + assert distributor.fit_call is True + + def test_get_model_rank(self, distributor, model_with_minor_branch): + assert distributor._get_model_rank() == 1 + distributor.model = model_with_minor_branch + assert distributor._get_model_rank() == 2 + distributor.model = 1 + + def test_get_number_of_mini_batches(self, distributor): + values = np.zeros((2, 2311, 19)) + assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size) + + def test_distribute_on_batches_single_loop(self, generator_two_stations, model): + d = Distributor(generator_two_stations, model) + for e in d.distribute_on_batches(fit_call=False): + assert e[0].shape[0] <= d.batch_size + + def test_distribute_on_batches_infinite_loop(self, generator_two_stations, model): + d = Distributor(generator_two_stations, model) + elements = [] + for i, e in enumerate(d.distribute_on_batches()): + if i < len(d): + elements.append(e[0]) + elif i == 2*len(d): # check if all elements are repeated + assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None + else: # break when 3rd iteration starts (is called as infinite loop) + break + + def test_len(self, distributor): + assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256) + + def test_len_two_stations(self, generator_two_stations, model): + gen = generator_two_stations + d = Distributor(gen, model) + expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256) + assert len(d) == expected diff --git a/test/test_data_generator.py b/test/test_data_handling/test_data_generator.py similarity index 90% rename from test/test_data_generator.py rename to test/test_data_handling/test_data_generator.py index 6801e064be804da0368520ab0ef81bdba8bca2d3..879436afddb8da8d11d6cc585da7c703aa12ef8a 100644 --- a/test/test_data_generator.py +++ b/test/test_data_handling/test_data_generator.py @@ -1,10 +1,17 @@ import pytest import os -from src.data_generator import DataGenerator +import shutil +from src.data_handling.data_generator import DataGenerator class TestDataGenerator: + # @pytest.fixture(autouse=True, scope='module') + # def teardown_module(self): + # yield + # if "data" in os.listdir(os.path.dirname(__file__)): + # shutil.rmtree(os.path.join(os.path.dirname(__file__), "data"), ignore_errors=True) + @pytest.fixture def gen(self): return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'], diff --git a/test/test_data_preparation.py b/test/test_data_handling/test_data_preparation.py similarity index 98% rename from test/test_data_preparation.py rename to test/test_data_handling/test_data_preparation.py index 30f93e6d734885252d2c7a438d6065aa680f32f8..12b619d9e31990f6cc24216ff84ad9d030265e36 100644 --- a/test/test_data_preparation.py +++ b/test/test_data_handling/test_data_preparation.py @@ -1,8 +1,7 @@ import pytest import os -from src.data_preparation import DataPrep +from src.data_handling.data_preparation import DataPrep from src.join import EmptyQueryResult -import logging import numpy as np import xarray as xr import datetime as dt @@ -45,7 +44,7 @@ class TestDataPrep: def test_set_file_name_and_meta(self): d = object.__new__(DataPrep) - d.path = os.path.abspath('test/data/') + d.path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") d.station = 'TESTSTATION' d.variables = ['a', 'bc'] assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), diff --git a/test/test_helpers.py b/test/test_helpers.py index 7de58f331ea4b6eeaf46b6d8b10f8ad1da487f3c..ce5d28a63d63dc4a793e6e07c60f95cb411ae97e 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -117,7 +117,7 @@ class TestTimeTracking: t = TimeTracking() t._end() duration = t._duration() - assert t.__repr__().rstrip() == f"{round(duration, 2)}s".rstrip() + assert t.__repr__().rstrip() == f"{dt.timedelta(seconds=math.ceil(duration))} (hh:mm:ss)".rstrip() def test_run(self): t = TimeTracking(start=False) @@ -183,9 +183,9 @@ class TestSetExperimentName: assert exp_name == "TestExperiment" assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "TestExperiment")) exp_name, exp_path = set_experiment_name(experiment_date="2019-11-14", experiment_path="./test2") - assert exp_name == "2019-11-14_network/" - assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test2")) + assert exp_name == "2019-11-14_network" + assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test2", exp_name)) def test_set_experiment_from_sys(self): exp_name, _ = set_experiment_name(experiment_date="2019-11-14") - assert exp_name == "2019-11-14_network/" + assert exp_name == "2019-11-14_network" diff --git a/test/test_modules/test_experiment_setup.py b/test/test_modules/test_experiment_setup.py index e1ec57f9e6cace563b6311ee0c1f34fefcb2c7c2..bfff606ea367f3bb19380078ccecd7db508bb9b1 100644 --- a/test/test_modules/test_experiment_setup.py +++ b/test/test_modules/test_experiment_setup.py @@ -113,8 +113,9 @@ class TestExperimentSetup: assert data_store.get("trainable", "general") is True assert data_store.get("fraction_of_training", "general") == 0.5 # set experiment name - assert data_store.get("experiment_name", "general") == "TODAY_network/" - path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder")) + assert data_store.get("experiment_name", "general") == "TODAY_network" + path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data", "testExperimentFolder", + "TODAY_network")) assert data_store.get("experiment_path", "general") == path # setup for data assert data_store.get("var_all_dict", "general") == {'o3': 'dma8eu', 'relhum': 'average_values', diff --git a/test/test_modules/test_model_setup.py b/test/test_modules/test_model_setup.py index 7f8c7a051542ff7fc317c0c92454c28f1d0d70b5..85cb24e3a31af69c8ae9735b4c85173318866339 100644 --- a/test/test_modules/test_model_setup.py +++ b/test/test_modules/test_model_setup.py @@ -1,11 +1,10 @@ -import logging import pytest import os import keras from src.modules.model_setup import ModelSetup from src.modules.run_environment import RunEnvironment -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator class TestModelSetup: @@ -34,7 +33,7 @@ class TestModelSetup: def test_set_checkpoint(self, setup): assert "general.modeltest" not in setup.data_store.search_name("checkpoint") - setup.model_name = "TestName" + setup.checkpoint_name = "TestName" setup._set_checkpoint() assert "general.modeltest" in setup.data_store.search_name("checkpoint") diff --git a/test/test_modules/test_pre_processing.py b/test/test_modules/test_pre_processing.py index 41e8729db8841a257b86740edecbbdfd3e0dc910..c884b14657447a50377dc38ec2dea10ba300f4d7 100644 --- a/test/test_modules/test_pre_processing.py +++ b/test/test_modules/test_pre_processing.py @@ -1,22 +1,16 @@ import logging import pytest -import time from src.helpers import PyTestRegex from src.modules.experiment_setup import ExperimentSetup from src.modules.pre_processing import PreProcessing, DEFAULT_ARGS_LIST, DEFAULT_KWARGS_LIST -from src.data_generator import DataGenerator +from src.data_handling.data_generator import DataGenerator from src.datastore import NameNotFoundInScope from src.modules.run_environment import RunEnvironment class TestPreProcessing: - @pytest.fixture - def obj_no_init(self): - yield object.__new__(PreProcessing) - RunEnvironment().__del__() - @pytest.fixture def obj_super_init(self): obj = object.__new__(PreProcessing) @@ -45,8 +39,8 @@ class TestPreProcessing: with PreProcessing(): assert caplog.record_tuples[0] == ('root', 20, 'PreProcessing started') assert caplog.record_tuples[1] == ('root', 20, 'check valid stations started') - assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 5 station\(s\). ' - r'Found 5/5 valid stations.')) + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+:\d+:\d+ \(hh:mm:ss\) to check 5 ' + r'station\(s\). Found 5/5 valid stations.')) RunEnvironment().__del__() def test_run(self, obj_with_exp_setup): @@ -94,12 +88,12 @@ class TestPreProcessing: assert len(valid_stations) < len(stations) assert valid_stations == stations[:-1] assert caplog.record_tuples[0] == ('root', 20, 'check valid stations started') - assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+\.\d+s to check 6 station\(s\). Found ' - r'5/6 valid stations.')) + assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r'run for \d+:\d+:\d+ \(hh:mm:ss\) to check 6 ' + r'station\(s\). Found 5/6 valid stations.')) - def test_split_set_indices(self, obj_no_init): + def test_split_set_indices(self, obj_super_init): dummy_list = list(range(0, 15)) - train, val, test = obj_no_init.split_set_indices(len(dummy_list), 0.9) + train, val, test = obj_super_init.split_set_indices(len(dummy_list), 0.9) assert dummy_list[train] == list(range(0, 10)) assert dummy_list[val] == list(range(10, 13)) assert dummy_list[test] == list(range(13, 15)) diff --git a/test/test_modules/test_run_environment.py b/test/test_modules/test_run_environment.py index ce5f995e54df1ffc93c80c191376347c5f0b3741..1eeaa02c530d05e1ceee7bde8811db53ad6042aa 100644 --- a/test/test_modules/test_run_environment.py +++ b/test/test_modules/test_run_environment.py @@ -16,7 +16,8 @@ class TestRunEnvironment: caplog.set_level(logging.INFO) with RunEnvironment() as r: r.do_stuff(0.1) - assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s")) + expression = PyTestRegex(r"RunEnvironment finished after \d+:\d+:\d+ \(hh:mm:ss\)") + assert caplog.record_tuples[-1] == ('root', 20, expression) def test_init(self, caplog): caplog.set_level(logging.INFO) @@ -28,4 +29,5 @@ class TestRunEnvironment: r = RunEnvironment() r.do_stuff(0.2) del r - assert caplog.record_tuples[-1] == ('root', 20, PyTestRegex(r"RunEnvironment finished after \d+\.\d+s")) + expression = PyTestRegex(r"RunEnvironment finished after \d+:\d+:\d+ \(hh:mm:ss\)") + assert caplog.record_tuples[-1] == ('root', 20, expression) diff --git a/test/test_modules/test_training.py b/test/test_modules/test_training.py new file mode 100644 index 0000000000000000000000000000000000000000..c91cb691464c37456840e2c3150d43f29fc4859b --- /dev/null +++ b/test/test_modules/test_training.py @@ -0,0 +1,191 @@ +import keras +import pytest +from keras.callbacks import ModelCheckpoint, History +import mock +import os +import json +import shutil +import logging + +from src.inception_model import InceptionModelBase +from src.flatten import flatten_tail +from src.modules.training import Training +from src.modules.run_environment import RunEnvironment +from src.data_handling.data_distributor import Distributor +from src.data_handling.data_generator import DataGenerator +from src.helpers import LearningRateDecay, PyTestRegex + + +def my_test_model(activation, window_history_size, channels, dropout_rate, add_minor_branch=False): + inception_model = InceptionModelBase() + conv_settings_dict1 = { + 'tower_1': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (3, 1), 'activation': activation}, + 'tower_2': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (5, 1), 'activation': activation}, } + pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 8 * 2, 'activation': activation} + X_input = keras.layers.Input(shape=(window_history_size + 1, 1, channels)) + X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1) + if add_minor_branch: + out = [flatten_tail(X_in, 'Minor_1', activation=activation)] + else: + out = [] + X_in = keras.layers.Dropout(dropout_rate)(X_in) + out.append(flatten_tail(X_in, 'Main', activation=activation)) + return keras.Model(inputs=X_input, outputs=out) + + +class TestTraining: + + @pytest.fixture + def init_without_run(self, path, model, checkpoint): + obj = object.__new__(Training) + super(Training, obj).__init__() + obj.model = model + obj.train_set = None + obj.val_set = None + obj.test_set = None + obj.batch_size = 256 + obj.epochs = 2 + obj.checkpoint = checkpoint + obj.lr_sc = LearningRateDecay() + obj.experiment_name = "TestExperiment" + obj.data_store.put("generator", mock.MagicMock(return_value="mock_train_gen"), "general.train") + obj.data_store.put("generator", mock.MagicMock(return_value="mock_val_gen"), "general.val") + obj.data_store.put("generator", mock.MagicMock(return_value="mock_test_gen"), "general.test") + os.makedirs(path) + obj.data_store.put("experiment_path", path, "general") + obj.data_store.put("experiment_name", "TestExperiment", "general") + yield obj + if os.path.exists(path): + shutil.rmtree(path) + RunEnvironment().__del__() + + @pytest.fixture + def history(self): + h = History() + h.epoch = [0, 1] + h.history = {'val_loss': [0.5586272982587484, 0.45712877659670287], + 'val_mean_squared_error': [0.5586272982587484, 0.45712877659670287], + 'val_mean_absolute_error': [0.595368885413389, 0.530547587585537], + 'loss': [0.6795708956961347, 0.45963566494176616], + 'mean_squared_error': [0.6795708956961347, 0.45963566494176616], + 'mean_absolute_error': [0.6523177288928538, 0.5363963260296364]} + return h + + @pytest.fixture + def path(self): + return os.path.join(os.path.dirname(__file__), "TestExperiment") + + @pytest.fixture + def generator(self, path): + return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', + ['DEBW107'], ['o3', 'temp'], 'datetime', 'variables', + 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + @pytest.fixture + def model(self): + return my_test_model(keras.layers.PReLU, 7, 2, 0.1, False) + + @pytest.fixture + def checkpoint(self, path): + return ModelCheckpoint(os.path.join(path, "model_checkpoint"), monitor='val_loss', save_best_only=True) + + @pytest.fixture + def ready_to_train(self, generator, init_without_run): + init_without_run.train_set = Distributor(generator, init_without_run.model, init_without_run.batch_size) + init_without_run.val_set = Distributor(generator, init_without_run.model, init_without_run.batch_size) + init_without_run.model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.mean_absolute_error) + return init_without_run + + @pytest.fixture + def ready_to_run(self, generator, init_without_run): + obj = init_without_run + obj.data_store.put("generator", generator, "general.train") + obj.data_store.put("generator", generator, "general.val") + obj.data_store.put("generator", generator, "general.test") + obj.model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.mean_absolute_error) + return obj + + @pytest.fixture + def ready_to_init(self, generator, model, checkpoint, path): + os.makedirs(path) + obj = RunEnvironment() + obj.data_store.put("generator", generator, "general.train") + obj.data_store.put("generator", generator, "general.val") + obj.data_store.put("generator", generator, "general.test") + model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.mean_absolute_error) + obj.data_store.put("model", model, "general.model") + obj.data_store.put("batch_size", 256, "general.model") + obj.data_store.put("epochs", 2, "general.model") + obj.data_store.put("checkpoint", checkpoint, "general.model") + obj.data_store.put("lr_decay", LearningRateDecay(), "general.model") + obj.data_store.put("experiment_name", "TestExperiment", "general") + obj.data_store.put("experiment_path", path, "general") + yield obj + if os.path.exists(path): + shutil.rmtree(path) + + def test_init(self, ready_to_init): + assert isinstance(Training(), Training) # just test, if nothing fails + + def test_run(self, ready_to_run): + assert ready_to_run._run() is None # just test, if nothing fails + + def test_make_predict_function(self, init_without_run): + assert hasattr(init_without_run.model, "predict_function") is False + init_without_run.make_predict_function() + assert hasattr(init_without_run.model, "predict_function") + + def test_set_gen(self, init_without_run): + assert init_without_run.train_set is None + init_without_run._set_gen("train") + assert isinstance(init_without_run.train_set, Distributor) + assert init_without_run.train_set.generator.return_value == "mock_train_gen" + + def test_set_generators(self, init_without_run): + sets = ["train", "val", "test"] + assert all([getattr(init_without_run, f"{obj}_set") is None for obj in sets]) + init_without_run.set_generators() + assert not all([getattr(init_without_run, f"{obj}_set") is None for obj in sets]) + assert all([getattr(init_without_run, f"{obj}_set").generator.return_value == f"mock_{obj}_gen" for obj in sets]) + + def test_train(self, ready_to_train): + assert not hasattr(ready_to_train.model, "history") + ready_to_train.train() + assert list(ready_to_train.model.history.history.keys()) == ["val_loss", "loss"] + assert ready_to_train.model.history.epoch == [0, 1] + + def test_save_model(self, init_without_run, path, caplog): + caplog.set_level(logging.DEBUG) + model_name = "TestExperiment_my_model.h5" + assert model_name not in os.listdir(path) + init_without_run.save_model() + assert caplog.record_tuples[0] == ("root", 10, PyTestRegex(f"save best model to {os.path.join(path, model_name)}")) + assert model_name in os.listdir(path) + + def test_load_best_model_no_weights(self, init_without_run, caplog): + caplog.set_level(logging.DEBUG) + init_without_run.load_best_model("notExisting") + assert caplog.record_tuples[0] == ("root", 10, PyTestRegex("load best model: notExisting")) + assert caplog.record_tuples[1] == ("root", 20, PyTestRegex("no weights to reload...")) + + def test_save_callbacks_history_created(self, init_without_run, history, path): + init_without_run.save_callbacks(history) + assert "history.json" in os.listdir(path) + + def test_save_callbacks_lr_created(self, init_without_run, history, path): + init_without_run.save_callbacks(history) + assert "history_lr.json" in os.listdir(path) + + def test_save_callbacks_inspect_history(self, init_without_run, history, path): + init_without_run.save_callbacks(history) + with open(os.path.join(path, "history.json")) as jfile: + hist = json.load(jfile) + assert hist == history.history + + def test_save_callbacks_inspect_lr(self, init_without_run, history, path): + init_without_run.save_callbacks(history) + with open(os.path.join(path, "history_lr.json")) as jfile: + lr = json.load(jfile) + assert lr == init_without_run.lr_sc.lr + +