Skip to content
Snippets Groups Projects
Commit 03a54c27 authored by lukas leufen's avatar lukas leufen
Browse files

add Distributor, expanded Training class

parent 661466cf
No related branches found
No related tags found
2 merge requests!24include recent development,!20not distributed training
Pipeline #26943 passed
......@@ -3,7 +3,7 @@ __date__ = '2019-12-02'
import keras
from keras import losses, layers
from keras import losses
from keras.callbacks import ModelCheckpoint
from keras.regularizers import l2
from keras.optimizers import Adam, SGD
......@@ -104,7 +104,7 @@ class ModelSetup(RunEnvironment):
self.data_store.put("batch_size", int(256), self.scope)
# activation
activation = layers.PReLU # ELU #LeakyReLU keras.activations.tanh #
activation = keras.layers.PReLU # ELU #LeakyReLU keras.activations.tanh #
self.data_store.put("activation", activation, self.scope)
# set los
......@@ -151,7 +151,7 @@ def my_model(activation, window_history_size, channels, regularizer, dropout_rat
##########################################
inception_model = InceptionModelBase()
X_input = layers.Input(shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0
X_input = keras.layers.Input(shape=(window_history_size + 1, 1, channels)) # add 1 to window_size to include current time step t0
X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1, regularizer=regularizer,
batch_normalisation=True)
......@@ -159,12 +159,12 @@ def my_model(activation, window_history_size, channels, regularizer, dropout_rat
out_minor = flatten_tail(X_in, 'Minor_1', bound_weight=True, activation=activation, dropout_rate=dropout_rate,
reduction_filter=4, first_dense=32, window_lead_time=window_lead_time)
X_in = layers.Dropout(dropout_rate)(X_in)
X_in = keras.layers.Dropout(dropout_rate)(X_in)
X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=regularizer,
batch_normalisation=True)
X_in = layers.Dropout(dropout_rate)(X_in)
X_in = keras.layers.Dropout(dropout_rate)(X_in)
X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=regularizer,
batch_normalisation=True)
......
__author__ = "Lukas Leufen"
__author__ = "Lukas Leufen, Felix Kleinert"
__date__ = '2019-12-05'
import keras
import logging
import numpy as np
import math
from src.modules.run_environment import RunEnvironment
......@@ -10,6 +15,13 @@ class Training(RunEnvironment):
def __init__(self):
super().__init__()
self.model = self.data_store.get("model", "general.model")
self.train_generator = None
self.val_generator = None
self.test_generator = None
self.batch_size = self.data_store.get("batch_size", "general.model")
self.epochs = self.data_store.get("epochs", "general.model")
self.checkpoint = self.data_store.get("checkpoint", "general.model")
self.lr_sc = self.data_store.get("epochs", "general.model")
def make_predict_function(self):
# create the predict function before distributing. This is necessary, because tf will compile the predict
......@@ -17,3 +29,71 @@ class Training(RunEnvironment):
# on different workers. To prevent this, the function is pre-compiled. See discussion @
# https://stackoverflow.com/questions/40850089/is-keras-thread-safe/43393252#43393252
self.model._make_predict_function()
def _set_gen(self, mode):
gen = self.data_store.get("generator", f"general.{mode}")
setattr(self, f"{mode}_generator", Distributor(gen, self.model, self.batch_size))
def set_generators(self):
map(lambda mode: self._set_gen(mode), ["train", "val", "test"])
def train(self):
logging.info(f"Train with {len(self.train_generator)} mini batches.")
history = self.model.fit_generator(generator=self.train_generator.distribute_on_batches(),
steps_per_epoch=len(self.train_generator),
epochs=self.epochs,
verbose=2,
validation_data=self.val_generator.distribute_on_batches(),
validation_steps=len(self.val_generator),
callbacks=[self.checkpoint, self.lr_sc])
class Distributor(keras.utils.Sequence):
def __init__(self, generator: keras.utils.Sequence, model: keras.models, batch_size: int = 256,
fit_call: bool = True):
self.generator = generator
self.model = model
self.batch_size = batch_size
self.fit_call = fit_call
def _get_model_rank(self):
mod_out = self.model.output_shape
if isinstance(mod_out, tuple):
# only one output branch: (None, ahead)
mod_rank = 1
elif isinstance(mod_out, list):
# multiple output branches, e.g.: [(None, ahead), (None, ahead)]
mod_rank = len(mod_out)
else: # pragma: no branch
raise TypeError("model output shape must either be tuple or list.")
return mod_rank
def _get_number_of_mini_batches(self, values):
return math.ceil(values[0].shape[0] / self.batch_size)
def distribute_on_batches(self, fit_call=True):
while True:
for k, v in enumerate(self.generator):
# get rank of output
mod_rank = self._get_model_rank()
# get number of mini batches
num_mini_batches = self._get_number_of_mini_batches(v)
x_total = np.copy(v[0])
y_total = np.copy(v[1])
for prev, curr in enumerate(range(1, num_mini_batches+1)):
x = x_total[prev*self.batch_size:curr*self.batch_size, ...]
y = [y_total[prev*self.batch_size:curr*self.batch_size, ...] for _ in range(mod_rank)]
if x is not None:
yield (x, y)
if (k + 1) == len(self.generator) and curr == num_mini_batches and not fit_call:
raise StopIteration
def __len__(self):
if self.batch_size > 1:
num_batch = 0
for _ in self.distribute_on_batches(fit_call=False):
num_batch += 1
else:
num_batch = len(self.generator)
return num_batch
import pytest
import os
import keras
import math
import numpy as np
from src.modules.training import Distributor
from src.data_generator import DataGenerator
from src.inception_model import InceptionModelBase
from src.flatten import flatten_tail
def my_test_model(activation, window_history_size, channels, dropout_rate, add_minor_branch=False):
inception_model = InceptionModelBase()
conv_settings_dict1 = {
'tower_1': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (3, 1), 'activation': activation},
'tower_2': {'reduction_filter': 8, 'tower_filter': 8 * 2, 'tower_kernel': (5, 1), 'activation': activation}, }
pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 8 * 2, 'activation': activation}
X_input = keras.layers.Input(shape=(window_history_size + 1, 1, channels))
X_in = inception_model.inception_block(X_input, conv_settings_dict1, pool_settings_dict1)
if add_minor_branch:
out = [flatten_tail(X_in, 'Minor_1', activation=activation)]
else:
out = []
X_in = keras.layers.Dropout(dropout_rate)(X_in)
out.append(flatten_tail(X_in, 'Main', activation=activation))
return keras.Model(inputs=X_input, outputs=out)
class TestDistributor:
@pytest.fixture
def generator(self):
return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', 'DEBW107', ['o3', 'temp'],
'datetime', 'variables', 'o3', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
@pytest.fixture
def generator_two_stations(self):
return DataGenerator(os.path.join(os.path.dirname(__file__), 'data'), 'AIRBASE', ['DEBW107', 'DEBW013'],
['o3', 'temp'], 'datetime', 'variables', 'o3',
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
@pytest.fixture
def model(self):
return my_test_model(keras.layers.PReLU, 5, 3, 0.1, False)
@pytest.fixture
def distributor(self, generator, model):
return Distributor(generator, model)
def test_init_defaults(self, distributor):
assert distributor.batch_size == 256
assert distributor.fit_call is True
def test_get_model_rank(self, distributor):
assert distributor._get_model_rank() == 1
distributor.model = my_test_model(keras.layers.PReLU, 5, 3, 0.1, True)
assert distributor._get_model_rank() == 2
distributor.model = 1
def test_get_number_of_mini_batches(self, distributor):
values = np.zeros((2, 2311, 19))
assert distributor._get_number_of_mini_batches(values) == math.ceil(2311 / distributor.batch_size)
def test_distribute_on_batches(self, generator_two_stations, model):
d = Distributor(generator_two_stations, model)
for e in d.distribute_on_batches(fit_call=False):
assert e[0].shape[0] <= d.batch_size
elements = []
for i, e in enumerate(d.distribute_on_batches()):
if i < len(d):
elements.append(e[0])
elif i == 2*len(d): # check if all elements are repeated
assert np.testing.assert_array_equal(e[0], elements[i - len(d)]) is None
else: # break when 3rd iteration starts (is called as infinite loop)
break
def test_len(self, distributor):
assert len(distributor) == math.ceil(len(distributor.generator[0][0]) / 256)
def test_len_two_stations(self, generator_two_stations, model):
gen = generator_two_stations
d = Distributor(gen, model)
expected = math.ceil(len(gen[0][0]) / 256) + math.ceil(len(gen[1][0]) / 256)
assert len(d) == expected
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment