diff --git a/src/model_modules/advanced_paddings.py b/src/model_modules/advanced_paddings.py index 1d48dfc0a87c05183fc5b8b7755f48efaf7b5428..2e2892d8b67e999f3d556403883c905d74a86392 100644 --- a/src/model_modules/advanced_paddings.py +++ b/src/model_modules/advanced_paddings.py @@ -6,6 +6,7 @@ import numpy as np import keras.backend as K from keras.layers.convolutional import _ZeroPadding +from keras.layers import ZeroPadding2D from keras.legacy import interfaces from keras.utils import conv_utils from keras.utils.generic_utils import transpose_shape @@ -109,6 +110,42 @@ class PadUtils: return normalized_padding +class Padding2D: + ''' + This class combines the implemented padding methods. You can call this method by defining a specific padding type. + The __call__ method will return the corresponding Padding layer. + ''' + def __init__(self, padding_type): + self.padding_type = padding_type + self.allowed_paddings = { + **dict.fromkeys(("RefPad2D", "ReflectionPadding2D"), ReflectionPadding2D), + **dict.fromkeys(("SymPad2D", "SymmetricPadding2D"), SymmetricPadding2D), + **dict.fromkeys(("ZeroPad2D", "ZeroPadding2D"), ZeroPadding2D) + } + + def _check_and_get_padding(self): + if isinstance(self.padding_type, str): + try: + pad2d = self.allowed_paddings[self.padding_type] + except KeyError as einfo: + raise NotImplementedError( + f"`{einfo}' is not implemented as padding. " + "Use one of those: i) `RefPad2D', ii) `SymPad2D', iii) `ZeroPad2D'") + else: + if self.padding_type in self.allowed_paddings.values(): + pad2d = self.padding_type + else: + raise TypeError(f"`{self.padding_type.__name__}' is not a valid padding layer type. " + "Use one of those: " + "i) ReflectionPadding2D, ii) SymmetricPadding2D, iii) ZeroPadding2D") + return pad2d + + def __call__(self, *args, **kwargs): + return self._check_and_get_padding()(*args, **kwargs) + + + + class ReflectionPadding2D(_ZeroPadding): """ Reflection padding layer for 2D input. This custum padding layer is built on keras' zero padding layers. Doc is copy @@ -258,7 +295,8 @@ if __name__ == '__main__': kernel_1 = (3, 3) kernel_2 = (5, 5) - x = np.array(range(2000)).reshape(-1, 10, 10, 1) + kernel_3 = (3, 3) + x = np.array(range(2000)).reshape((-1, 10, 10, 1)) y = x.mean(axis=(1, 2)) x_input = Input(shape=x.shape[1:]) @@ -269,6 +307,10 @@ if __name__ == '__main__': pad2 = PadUtils.get_padding_for_same(kernel_size=kernel_2) x_out = SymmetricPadding2D(padding=pad2, name="SymPAD")(x_out) x_out = Conv2D(2, kernel_size=kernel_2, activation='relu')(x_out) + + pad3 = PadUtils.get_padding_for_same(kernel_size=kernel_3) + x_out = Padding2D('RefPad2D')(padding=pad3, name="Padding2D_RefPad")(x_out) + x_out = Conv2D(2, kernel_size=kernel_3, activation='relu')(x_out) x_out = Flatten()(x_out) x_out = Dense(1, activation='linear')(x_out) diff --git a/src/model_modules/inception_model.py b/src/model_modules/inception_model.py index 1cb7656335495f0261abb434e4a203cb4e63887e..6467b3245ad097af6ef17e596f85264eef383d7a 100644 --- a/src/model_modules/inception_model.py +++ b/src/model_modules/inception_model.py @@ -5,7 +5,7 @@ import logging import keras import keras.layers as layers -from src.model_modules.advanced_paddings import PadUtils, ReflectionPadding2D, SymmetricPadding2D +from src.model_modules.advanced_paddings import PadUtils, ReflectionPadding2D, SymmetricPadding2D, Padding2D class InceptionModelBase: @@ -75,7 +75,10 @@ class InceptionModelBase: name=f'Block_{self.number_of_blocks}{self.block_part_name()}_1x1')(input_x) tower = self.act(tower, activation, **act_settings) - tower = self.padding_layer(padding)(padding=padding_size, + # tower = self.padding_layer(padding)(padding=padding_size, + # name=f'Block_{self.number_of_blocks}{self.block_part_name()}_Pad' + # )(tower) + tower = Padding2D(padding)(padding=padding_size, name=f'Block_{self.number_of_blocks}{self.block_part_name()}_Pad' )(tower) @@ -108,28 +111,28 @@ class InceptionModelBase: else: return act_name.__name__ - @staticmethod - def padding_layer(padding): - allowed_paddings = { - 'RefPad2D': ReflectionPadding2D, 'ReflectionPadding2D': ReflectionPadding2D, - 'SymPad2D': SymmetricPadding2D, 'SymmetricPadding2D': SymmetricPadding2D, - 'ZeroPad2D': keras.layers.ZeroPadding2D, 'ZeroPadding2D': keras.layers.ZeroPadding2D - } - if isinstance(padding, str): - try: - pad2d = allowed_paddings[padding] - except KeyError as einfo: - raise NotImplementedError( - f"`{einfo}' is not implemented as padding. " - "Use one of those: i) `RefPad2D', ii) `SymPad2D', iii) `ZeroPad2D'") - else: - if padding in allowed_paddings.values(): - pad2d = padding - else: - raise TypeError(f"`{padding.__name__}' is not a valid padding layer type. " - "Use one of those: " - "i) ReflectionPadding2D, ii) SymmetricPadding2D, iii) ZeroPadding2D") - return pad2d + # @staticmethod + # def padding_layer(padding): + # allowed_paddings = { + # 'RefPad2D': ReflectionPadding2D, 'ReflectionPadding2D': ReflectionPadding2D, + # 'SymPad2D': SymmetricPadding2D, 'SymmetricPadding2D': SymmetricPadding2D, + # 'ZeroPad2D': keras.layers.ZeroPadding2D, 'ZeroPadding2D': keras.layers.ZeroPadding2D + # } + # if isinstance(padding, str): + # try: + # pad2d = allowed_paddings[padding] + # except KeyError as einfo: + # raise NotImplementedError( + # f"`{einfo}' is not implemented as padding. " + # "Use one of those: i) `RefPad2D', ii) `SymPad2D', iii) `ZeroPad2D'") + # else: + # if padding in allowed_paddings.values(): + # pad2d = padding + # else: + # raise TypeError(f"`{padding.__name__}' is not a valid padding layer type. " + # "Use one of those: " + # "i) ReflectionPadding2D, ii) SymmetricPadding2D, iii) ZeroPadding2D") + # return pad2d def create_pool_tower(self, input_x, pool_kernel, tower_filter, activation='relu', max_pooling=True, **kwargs): """ @@ -156,7 +159,8 @@ class InceptionModelBase: block_type = "AvgPool" pooling = layers.AveragePooling2D - tower = self.padding_layer(padding)(padding=padding_size, name=block_name+'Pad')(input_x) + # tower = self.padding_layer(padding)(padding=padding_size, name=block_name+'Pad')(input_x) + tower = Padding2D(padding)(padding=padding_size, name=block_name+'Pad')(input_x) tower = pooling(pool_kernel, strides=(1, 1), padding='valid', name=block_name+block_type)(tower) # convolution block @@ -262,7 +266,7 @@ if __name__ == '__main__': 'padding': 'SymPad2D'}, 'tower_3': {'reduction_filter': 64, 'tower_filter': 64, - 'tower_kernel': (1, 1), + 'tower_kernel': (7, 7), 'activation': ELU, 'padding': ReflectionPadding2D} } diff --git a/src/model_modules/model_class.py b/src/model_modules/model_class.py index ebbd7a25cef9031436d932a6502c9726bfe3e318..7796ea4fbb529d4eaaa819f28804b034db596dfd 100644 --- a/src/model_modules/model_class.py +++ b/src/model_modules/model_class.py @@ -10,6 +10,7 @@ from typing import Any, Callable import keras from src.model_modules.inception_model import InceptionModelBase from src.model_modules.flatten import flatten_tail +from src.model_modules.advanced_paddings import PadUtils, Padding2D class AbstractModelClass(ABC): @@ -351,3 +352,136 @@ class MyTowerModel(AbstractModelClass): """ self.loss = [keras.losses.mean_squared_error] + + +class MyPaperModel(AbstractModelClass): + + def __init__(self, window_history_size, window_lead_time, channels): + + """ + Sets model and loss depending on the given arguments. + :param activation: activation function + :param window_history_size: number of historical time steps included in the input data + :param channels: number of variables used in input data + :param regularizer: <not used here> + :param dropout_rate: dropout rate used in the model [0, 1) + :param window_lead_time: number of time steps to forecast in the output layer + """ + + super().__init__() + + # settings + self.window_history_size = window_history_size + self.window_lead_time = window_lead_time + self.channels = channels + self.dropout_rate = .3 + self.regularizer = keras.regularizers.l2(0.001) + self.initial_lr = 1e-3 + # self.optimizer = keras.optimizers.adam(lr=self.initial_lr, amsgrad=True) + self.optimizer = keras.optimizers.SGD(lr=self.initial_lr, momentum=0.9) + self.lr_decay = src.model_modules.keras_extensions.LearningRateDecay(base_lr=self.initial_lr, drop=.94, epochs_drop=10) + self.epochs = 150 + self.batch_size = int(256 * 2) + self.activation = keras.layers.ELU + self.padding = "SymPad2D" + + # apply to model + self.set_model() + self.set_loss() + + def set_model(self): + + """ + Build the model. + :param activation: activation function + :param window_history_size: number of historical time steps included in the input data + :param channels: number of variables used in input data + :param dropout_rate: dropout rate used in the model [0, 1) + :param window_lead_time: number of time steps to forecast in the output layer + :return: built keras model + """ + activation = self.activation + first_kernel = (3,1) + first_filters = 16 + + conv_settings_dict1 = { + 'tower_1': {'reduction_filter': 8, 'tower_filter': 16 * 2, 'tower_kernel': (3, 1), + 'activation': activation}, + 'tower_2': {'reduction_filter': 8, 'tower_filter': 16 * 2, 'tower_kernel': (5, 1), + 'activation': activation}, + 'tower_3': {'reduction_filter': 8, 'tower_filter': 16 * 2, 'tower_kernel': (1, 1), + 'activation': activation}, + # 'tower_4':{'reduction_filter':8, 'tower_filter':8*2, 'tower_kernel':(7,1), 'activation':activation}, + } + pool_settings_dict1 = {'pool_kernel': (3, 1), 'tower_filter': 16, 'activation': activation} + + conv_settings_dict2 = { + 'tower_1': {'reduction_filter': 64, 'tower_filter': 32 * 2, 'tower_kernel': (3, 1), + 'activation': activation}, + 'tower_2': {'reduction_filter': 64, 'tower_filter': 32 * 2, 'tower_kernel': (5, 1), + 'activation': activation}, + 'tower_3': {'reduction_filter': 64, 'tower_filter': 32 * 2, 'tower_kernel': (1, 1), + 'activation': activation}, + # 'tower_4':{'reduction_filter':8*2, 'tower_filter':16*2, 'tower_kernel':(7,1), 'activation':activation}, + } + pool_settings_dict2 = {'pool_kernel': (3, 1), 'tower_filter': 32, 'activation': activation} + + conv_settings_dict3 = { + 'tower_1': {'reduction_filter': 64 * 2, 'tower_filter': 32 * 4, 'tower_kernel': (3, 1), + 'activation': activation}, + 'tower_2': {'reduction_filter': 64 * 2, 'tower_filter': 32 * 4, 'tower_kernel': (5, 1), + 'activation': activation}, + 'tower_3': {'reduction_filter': 64 * 2, 'tower_filter': 32 * 4, 'tower_kernel': (1, 1), + 'activation': activation}, + # 'tower_4':{'reduction_filter':16*4, 'tower_filter':32, 'tower_kernel':(7,1), 'activation':activation}, + } + pool_settings_dict3 = {'pool_kernel': (3, 1), 'tower_filter': 32, 'activation': activation} + + ########################################## + inception_model = InceptionModelBase() + + X_input = keras.layers.Input( + shape=(self.window_history_size + 1, 1, self.channels)) # add 1 to window_size to include current time step t0 + + pad_size = PadUtils.get_padding_for_same(first_kernel) + # X_in = adv_pad.SymmetricPadding2D(padding=pad_size)(X_input) + # X_in = inception_model.padding_layer("SymPad2D")(padding=pad_size, name="SymPad")(X_input) # adv_pad.SymmetricPadding2D(padding=pad_size)(X_input) + X_in = Padding2D("SymPad2D")(padding=pad_size, name="SymPad")(X_input) + X_in = keras.layers.Conv2D(filters=first_filters, + kernel_size=first_kernel, + kernel_regularizer=self.regularizer, + name="First_conv_{}x{}".format(first_kernel[0], first_kernel[1]))(X_in) + X_in = self.activation(name='FirstAct')(X_in) + + + X_in = inception_model.inception_block(X_in, conv_settings_dict1, pool_settings_dict1, + regularizer=self.regularizer, + batch_normalisation=True, + padding=self.padding) + out_minor1 = flatten_tail(X_in, 'minor_1', False, self.dropout_rate, self.window_lead_time, + self.activation, 32, 64) + + X_in = keras.layers.Dropout(self.dropout_rate)(X_in) + + X_in = inception_model.inception_block(X_in, conv_settings_dict2, pool_settings_dict2, regularizer=self.regularizer, + batch_normalisation=True, padding=self.padding) + + # X_in = keras.layers.Dropout(self.dropout_rate)(X_in) + # + # X_in = inception_model.inception_block(X_in, conv_settings_dict3, pool_settings_dict3, regularizer=self.regularizer, + # batch_normalisation=True) + ############################################# + + out_main = flatten_tail(X_in, 'Main', activation=activation, bound_weight=False, dropout_rate=self.dropout_rate, + reduction_filter=64 * 2, first_dense=64 * 2, window_lead_time=self.window_lead_time) + + self.model = keras.Model(inputs=X_input, outputs=[out_minor1, out_main]) + + def set_loss(self): + + """ + Set the loss + :return: loss function + """ + + self.loss = [keras.losses.mean_squared_error, keras.losses.mean_squared_error] diff --git a/src/run_modules/model_setup.py b/src/run_modules/model_setup.py index 32ca0d2e82af32d8164d80ac42731e10f431a458..307fd63018df1e4825fa8fbee1fb07f6c8fef67e 100644 --- a/src/run_modules/model_setup.py +++ b/src/run_modules/model_setup.py @@ -12,6 +12,7 @@ from src.model_modules.keras_extensions import HistoryAdvanced, CallbackHandler # from src.model_modules.model_class import MyBranchedModel as MyModel from src.model_modules.model_class import MyLittleModel as MyModel # from src.model_modules.model_class import MyTowerModel as MyModel +# from src.model_modules.model_class import MyPaperModel as MyModel from src.run_modules.run_environment import RunEnvironment diff --git a/test/test_model_modules/test_advanced_paddings.py b/test/test_model_modules/test_advanced_paddings.py index 5282eb6df34d4d395dbbdd1fd76fd71a95e9c8df..bbeaf1c745a63b3607062b0c4052088c9af06b92 100644 --- a/test/test_model_modules/test_advanced_paddings.py +++ b/test/test_model_modules/test_advanced_paddings.py @@ -417,3 +417,61 @@ class TestSymmerticPadding2D: sym_pad = SymmetricPadding2D(padding=pad, name=layer_name)(input_x) assert sym_pad.get_shape().as_list() == [None, 12, 10, 3] assert sym_pad.name == 'SymPad_3x1/MirrorPad:0' + + +class TestPadding2D: + + @pytest.fixture + def input_x(self): + return keras.Input(shape=(32, 32, 3)) + + def test_init(self): + padding_layer = Padding2D('SymPad2D') + assert padding_layer.padding_type == 'SymPad2D' + assert padding_layer.allowed_paddings == { + 'RefPad2D': ReflectionPadding2D, 'ReflectionPadding2D': ReflectionPadding2D, + 'SymPad2D': SymmetricPadding2D, 'SymmetricPadding2D': SymmetricPadding2D, + 'ZeroPad2D': ZeroPadding2D, 'ZeroPadding2D': ZeroPadding2D + } + + + def test_check_and_get_padding_zero_padding(self): + assert Padding2D('ZeroPad2D')._check_and_get_padding() == ZeroPadding2D + assert Padding2D('ZeroPadding2D')._check_and_get_padding() == ZeroPadding2D + assert Padding2D(keras.layers.ZeroPadding2D)._check_and_get_padding() == ZeroPadding2D + + def test_check_and_get_padding_sym_padding(self): + assert Padding2D('SymPad2D')._check_and_get_padding() == SymmetricPadding2D + assert Padding2D('SymmetricPadding2D')._check_and_get_padding() == SymmetricPadding2D + assert Padding2D(SymmetricPadding2D)._check_and_get_padding() == SymmetricPadding2D + + def test_check_and_get_padding_ref_padding(self): + assert Padding2D('RefPad2D')._check_and_get_padding() == ReflectionPadding2D + assert Padding2D('ReflectionPadding2D')._check_and_get_padding() == ReflectionPadding2D + assert Padding2D(ReflectionPadding2D)._check_and_get_padding() == ReflectionPadding2D + + def test_check_and_get_padding_raises(self,): + with pytest.raises(NotImplementedError) as einfo: + Padding2D('FalsePadding2D')._check_and_get_padding() + assert "`'FalsePadding2D'' is not implemented as padding. " \ + "Use one of those: i) `RefPad2D', ii) `SymPad2D', iii) `ZeroPad2D'" in str(einfo.value) + with pytest.raises(TypeError) as einfo: + Padding2D(keras.layers.Conv2D)._check_and_get_padding() + assert "`Conv2D' is not a valid padding layer type. Use one of those: "\ + "i) ReflectionPadding2D, ii) SymmetricPadding2D, iii) ZeroPadding2D" in str(einfo.value) + + @pytest.mark.parametrize("pad_type", ["SymPad2D", "SymmetricPadding2D", SymmetricPadding2D, + "RefPad2D", "ReflectionPadding2D", ReflectionPadding2D, + "ZeroPad2D", "ZeroPadding2D", ZeroPadding2D]) + def test_call(self, pad_type, input_x): + pd = Padding2D(pad_type) + if hasattr(pad_type, "__name__"): + layer_name = pad_type.__name__ + else: + layer_name = pad_type + pd_ap = pd(padding=(1,2), name=f"{layer_name}_layer")(input_x) + assert pd_ap._keras_history[0].input_shape == (None, 32, 32, 3) + assert pd_ap._keras_history[0].output_shape == (None, 34, 36, 3) + assert pd_ap._keras_history[0].padding == ((1, 1), (2, 2)) + assert pd_ap._keras_history[0].name == f"{layer_name}_layer" + diff --git a/test/test_model_modules/test_inception_model.py b/test/test_model_modules/test_inception_model.py index 9dee30788c34cd8d1a7572947ea2e568ac2006b7..e5e92158425a73c5af1c6d1623d970e1037bbd80 100644 --- a/test/test_model_modules/test_inception_model.py +++ b/test/test_model_modules/test_inception_model.py @@ -277,48 +277,3 @@ class TestInceptionModelBase: bn = base.batch_normalisation(input_x)._keras_history[0] assert isinstance(bn, keras.layers.normalization.BatchNormalization) assert bn.name == "Block_0a_BN" - - def test_padding_layer_zero_padding(self, base, input_x): - padding_size = ((1, 1), (0, 0)) - zp = base.padding_layer('ZeroPad2D') - assert zp == keras.layers.convolutional.ZeroPadding2D - assert base.padding_layer('ZeroPadding2D') == keras.layers.convolutional.ZeroPadding2D - assert base.padding_layer(keras.layers.ZeroPadding2D) == keras.layers.convolutional.ZeroPadding2D - assert zp.__name__ == 'ZeroPadding2D' - zp_ap = zp(padding=padding_size)(input_x) - assert zp_ap._keras_history[0].padding == ((1, 1), (0, 0)) - - def test_padding_layer_sym_padding(self, base, input_x): - padding_size = ((1, 1), (0, 0)) - zp = base.padding_layer('SymPad2D') - assert zp == SymmetricPadding2D - assert base.padding_layer('SymmetricPadding2D') == SymmetricPadding2D - assert base.padding_layer(SymmetricPadding2D) == SymmetricPadding2D - assert zp.__name__ == 'SymmetricPadding2D' - zp_ap = zp(padding=padding_size)(input_x) - assert zp_ap._keras_history[0].padding == ((1, 1), (0, 0)) - - def test_padding_layer_ref_padding(self, base, input_x): - padding_size = ((1, 1), (0, 0)) - zp = base.padding_layer('RefPad2D') - assert zp == ReflectionPadding2D - assert base.padding_layer('ReflectionPadding2D') == ReflectionPadding2D - assert base.padding_layer(ReflectionPadding2D) == ReflectionPadding2D - assert zp.__name__ == 'ReflectionPadding2D' - zp_ap = zp(padding=padding_size)(input_x) - assert zp_ap._keras_history[0].padding == ((1, 1), (0, 0)) - - def test_padding_layer_raises(self, base, input_x): - with pytest.raises(NotImplementedError) as einfo: - base.padding_layer('FalsePadding2D') - assert "`'FalsePadding2D'' is not implemented as padding. " \ - "Use one of those: i) `RefPad2D', ii) `SymPad2D', iii) `ZeroPad2D'" in str(einfo.value) - with pytest.raises(TypeError) as einfo: - base.padding_layer(keras.layers.Conv2D) - assert "`Conv2D' is not a valid padding layer type. Use one of those: "\ - "i) ReflectionPadding2D, ii) SymmetricPadding2D, iii) ZeroPadding2D" in str(einfo.value) - - - - - diff --git a/test/test_model_modules/test_model_class.py b/test/test_model_modules/test_model_class.py index 0dbd2d9b67a0748bf09eb4f59e1888aae1ea405d..13f982b80906d8d5d6beae7075b23f4c84d6edd1 100644 --- a/test/test_model_modules/test_model_class.py +++ b/test/test_model_modules/test_model_class.py @@ -2,6 +2,7 @@ import keras import pytest from src.model_modules.model_class import AbstractModelClass +from src.model_modules.model_class import MyPaperModel, MyTowerModel, MyLittleModel, MyBranchedModel class TestAbstractModelClass: @@ -27,3 +28,35 @@ class TestAbstractModelClass: assert hasattr(amc, "compile") is True assert hasattr(amc.model, "compile") is True assert amc.compile == amc.model.compile + + +class TestMyPaperModel: + + @pytest.fixture + def mpm(self): + return MyPaperModel(window_history_size=6, window_lead_time=4, channels=9) + + def test_init(self, mpm): + # check if loss number of loss functions fit to model outputs + # same loss fkts. for all tails or different fkts. per tail + if isinstance(mpm.model.output_shape, list): + assert (callable(mpm.loss) or (len(mpm.loss) == 1)) or (len(mpm.loss) == len(mpm.model.output_shape)) + elif isinstance(mpm.model.output_shape, tuple): + assert callable(mpm.loss) or (len(mpm.loss) == 1) + + def test_set_model(self, mpm): + assert isinstance(mpm.model, keras.Model) + assert mpm.model.layers[0].output_shape == (None, 7, 1, 9) + # check output dimensions + if isinstance(mpm.model.output_shape, tuple): + assert mpm.model.output_shape == (None, 4) + elif isinstance(mpm.model.output_shape, list): + for tail_shape in mpm.model.output_shape: + assert tail_shape == (None, 4) + else: + raise TypeError(f"Type of model.output_shape as to be a tuple (one tail)" + f" or a list of tuples (multiple tails). Received: {type(mpm.model.output_shape)}") + + def test_set_loss(self, mpm): + assert callable(mpm.loss) or (len(mpm.loss) > 0) +