diff --git a/README.md b/README.md index 585628e504ad806fba65be4f1b68cdfab260b40f..329b1f16ffc1f38ddc78408a5d04199b69bf7dc8 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ # MachineLearningTools -This is a collection of all relevant functions used for ML stuff in the ESDE group \ No newline at end of file +This is a collection of all relevant functions used for ML stuff in the ESDE group + +## Inception Model + +See a description [here](https://towardsdatascience.com/a-simple-guide-to-the-versions-of-the-inception-network-7fc52b863202) +or take a look on the papers [Going Deeper with Convolutions (Szegedy et al., 2014)](https://arxiv.org/abs/1409.4842) +and [Network In Network (Lin et al., 2014)](https://arxiv.org/abs/1312.4400). \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 753c6e7f74687102959923303c80a8090bba8907..a956e8ff221dd8425a3cad3df4a45c334d58a040 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ Keras==2.2.4 numpy==1.15.4 tensorflow==1.12.0 +pytest==5.2.1 +pydot \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/inception_model.py b/src/inception_model.py index 8ffbb3d5109bbc723804e42f6a34d0e222b3ceb4..09e032ca70a8e63d9459464e968d25fac29cd4f1 100644 --- a/src/inception_model.py +++ b/src/inception_model.py @@ -1,11 +1,8 @@ __author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-22' import keras -from keras.layers import Input, Dense, Conv2D, MaxPooling2D, AveragePooling2D, ZeroPadding2D, Dropout, Flatten, \ - Concatenate, Reshape, Activation -from keras.models import Model -from keras.regularizers import l2 -from keras.optimizers import SGD +import keras.layers as layers class InceptionModelBase: @@ -16,13 +13,8 @@ class InceptionModelBase: def __init__(self): self.number_of_blocks = 0 self.part_of_block = 0 - # conversion between chr and ord: - # >>> chr(97) - # 'a' - # >>> ord('a') - # 97 - # set to 96 as always add +1 for new part of block - self.ord_base = 96 + self.act_number = 0 + self.ord_base = 96 # set to 96 as always add +1 for new part of block, chr(97)='a' def block_part_name(self): """ @@ -31,71 +23,116 @@ class InceptionModelBase: """ return chr(self.ord_base + self.part_of_block) + def batch_normalisation(self, input_x, **kwargs): + block_name = f"Block_{self.number_of_blocks}{self.block_part_name()}_BN" + return layers.BatchNormalization(name=block_name, **kwargs)(input_x) + def create_conv_tower(self, - input_X, + input_x, reduction_filter, tower_filter, tower_kernel, activation='relu', - regularizer=l2(0.01)): + batch_normalisation=False, + **kwargs): """ - This function creates a "convolution tower block" containing a 1x1 convolution to reduce filter size followed by convolution - with given filter and kernel size - :param input_X: Input to network part + This function creates a "convolution tower block" containing a 1x1 convolution to reduce filter size followed by + convolution with given filter and kernel size + :param input_x: Input to network part :param reduction_filter: Number of filters used in 1x1 convolution to reduce overall filter size before conv. :param tower_filter: Number of filters for n x m convolution :param tower_kernel: kernel size for convolution (n,m) :param activation: activation function for convolution + :param batch_normalisation: :return: """ self.part_of_block += 1 + self.act_number = 1 + regularizer = kwargs.get('regularizer', keras.regularizers.l2(0.01)) + bn_settings = kwargs.get('bn_settings', {}) + act_settings = kwargs.get('act_settings', {}) + print(f'Inception Block with activation: {activation}') + + block_name = f'Block_{self.number_of_blocks}{self.block_part_name()}_{tower_kernel[0]}x{tower_kernel[1]}' if tower_kernel == (1, 1): - tower = Conv2D(tower_filter, - tower_kernel, - activation=activation, - padding='same', - kernel_regularizer=regularizer, - name='Block_{}{}_{}x{}'.format(self.number_of_blocks, - self.block_part_name(), - tower_kernel[0], - tower_kernel[1]))(input_X) + tower = layers.Conv2D(tower_filter, + tower_kernel, + padding='same', + kernel_regularizer=regularizer, + name=block_name)(input_x) + tower = self.act(tower, activation, **act_settings) else: - tower = Conv2D(reduction_filter, - (1, 1), - activation=activation, - padding='same', - kernel_regularizer=regularizer, - name='Block_{}{}_1x1'.format(self.number_of_blocks, self.block_part_name()))(input_X) - - tower = Conv2D(tower_filter, - tower_kernel, - activation=activation, - padding='same', - kernel_regularizer=regularizer, - name='Block_{}{}_{}x{}'.format(self.number_of_blocks, - self.block_part_name(), - tower_kernel[0], - tower_kernel[1]))(tower) + tower = layers.Conv2D(reduction_filter, + (1, 1), + padding='same', + kernel_regularizer=regularizer, + name=f'Block_{self.number_of_blocks}{self.block_part_name()}_1x1')(input_x) + tower = self.act(tower, activation, **act_settings) + + tower = layers.Conv2D(tower_filter, + tower_kernel, + padding='same', + kernel_regularizer=regularizer, + name=block_name)(tower) + if batch_normalisation: + tower = self.batch_normalisation(tower, **bn_settings) + tower = self.act(tower, activation, **act_settings) + return tower + def act(self, input_x, activation, **act_settings): + block_name = f"Block_{self.number_of_blocks}{self.block_part_name()}_act_{self.act_number}" + try: + out = getattr(layers, self._get_act_name(activation))(**act_settings, name=block_name)(input_x) + except AttributeError: + block_name += f"_{activation.lower()}" + out = layers.Activation(activation.lower(), name=block_name)(input_x) + self.act_number += 1 + return out + @staticmethod - def create_pool_tower(input_X, pool_kernel, tower_filter): + def _get_act_name(act_name): + if isinstance(act_name, str): + mapping = {'relu': 'ReLU', 'prelu': 'PReLU', 'elu': 'ELU'} + return mapping.get(act_name.lower(), act_name) + else: + return act_name.__name__ + + def create_pool_tower(self, input_x, pool_kernel, tower_filter, activation='relu', max_pooling=True, **kwargs): """ This function creates a "MaxPooling tower block" - :param input_X: Input to network part + :param input_x: Input to network part :param pool_kernel: size of pooling kernel :param tower_filter: Number of filters used in 1x1 convolution to reduce filter size + :param activation: + :param max_pooling: :return: """ - tower = MaxPooling2D(pool_kernel, strides=(1, 1), padding='same')(input_X) - tower = Conv2D(tower_filter, (1, 1), padding='same', activation='relu')(tower) + self.part_of_block += 1 + self.act_number = 1 + act_settings = kwargs.get('act_settings', {}) + + # pooling block + block_name = f"Block_{self.number_of_blocks}{self.block_part_name()}_" + if max_pooling: + block_type = "MaxPool" + pooling = layers.MaxPooling2D + else: + block_type = "AvgPool" + pooling = layers.AveragePooling2D + tower = pooling(pool_kernel, strides=(1, 1), padding='same', name=block_name+block_type)(input_x) + + # convolution block + tower = Conv2D(tower_filter, (1, 1), padding='same', name=block_name+"1x1")(tower) + tower = self.act(tower, activation, **act_settings) + return tower - def inception_block(self, input_X, tower_conv_parts, tower_pool_parts): + def inception_block(self, input_x, tower_conv_parts, tower_pool_parts, **kwargs): """ Crate a inception block - :param input_X: Input to block + :param input_x: Input to block :param tower_conv_parts: dict containing settings for parts of inception block; Example: tower_conv_parts = {'tower_1': {'reduction_filter': 32, 'tower_filter': 64, @@ -115,41 +152,46 @@ class InceptionModelBase: self.part_of_block = 0 tower_build = {} for part, part_settings in tower_conv_parts.items(): - tower_build[part] = self.create_conv_tower(input_X, - part_settings['reduction_filter'], - part_settings['tower_filter'], - part_settings['tower_kernel'] - ) - tower_build['pool'] = self.create_pool_tower(input_X, - tower_pool_parts['pool_kernel'], - tower_pool_parts['tower_filter'] - ) + tower_build[part] = self.create_conv_tower(input_x, **part_settings, **kwargs) + if 'max_pooling' in tower_pool_parts.keys(): + max_pooling = tower_pool_parts.get('max_pooling') + if not isinstance(max_pooling, bool): + raise AttributeError(f"max_pooling has to be either a bool or empty. Given was: {max_pooling}") + pool_name = '{}pool'.format('max' if max_pooling else 'avg') + tower_build[pool_name] = self.create_pool_tower(input_x, **tower_pool_parts, **kwargs) + else: + tower_build['maxpool'] = self.create_pool_tower(input_x, **tower_pool_parts, **kwargs) + tower_build['avgpool'] = self.create_pool_tower(input_x, **tower_pool_parts, **kwargs, max_pooling=False) + block = keras.layers.concatenate(list(tower_build.values()), axis=3) return block - @staticmethod - def flatten_tail(input_X, tail_block): - input_X = Flatten()(input_X) - tail = tail_block(input_X) - return tail - if __name__ == '__main__': print(__name__) from keras.datasets import cifar10 from keras.utils import np_utils from keras.layers import Input + from keras.layers.advanced_activations import LeakyReLU + from keras.optimizers import SGD + from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D + from keras.models import Model + + # network settings conv_settings_dict = {'tower_1': {'reduction_filter': 64, 'tower_filter': 64, - 'tower_kernel': (3, 3)}, + 'tower_kernel': (3, 3), + 'activation': LeakyReLU}, 'tower_2': {'reduction_filter': 64, 'tower_filter': 64, - 'tower_kernel': (5, 5)}, + 'tower_kernel': (5, 5), + 'activation': 'relu'} } pool_settings_dict = {'pool_kernel': (3, 3), - 'tower_filter': 64} - myclass = True + 'tower_filter': 64, + 'activation': 'relu'} + # load data (X_train, y_train), (X_test, y_test) = cifar10.load_data() X_train = X_train.astype('float32') X_test = X_test.astype('float32') @@ -159,37 +201,19 @@ if __name__ == '__main__': y_test = np_utils.to_categorical(y_test) input_img = Input(shape=(32, 32, 3)) - if myclass: - googLeNet = InceptionModelBase() - output = googLeNet.inception_block(input_img, conv_settings_dict, pool_settings_dict) - else: - tower_1 = Conv2D(64, (1, 1), padding='same', activation='relu')(input_img) - tower_1 = Conv2D(64, (3, 3), padding='same', activation='relu')(tower_1) - - tower_2 = Conv2D(64, (1, 1), padding='same', activation='relu')(input_img) - tower_2 = Conv2D(64, (5, 5), padding='same', activation='relu')(tower_2) - - tower_3 = MaxPooling2D((3, 3), strides=(1, 1), padding='same')(input_img) - tower_3 = Conv2D(64, (1, 1), padding='same', activation='relu')(tower_3) - - output = keras.layers.concatenate([tower_1, tower_2, tower_3], axis=3) - + # create inception net + inception_net = InceptionModelBase() + output = inception_net.inception_block(input_img, conv_settings_dict, pool_settings_dict) output = Flatten()(output) - out = Dense(10, activation='softmax')(output) - model = Model(inputs=input_img, outputs=out) + output = Dense(10, activation='softmax')(output) + model = Model(inputs=input_img, outputs=output) print(model.summary()) + # compile epochs = 10 lrate = 0.01 decay = lrate/epochs sgd = SGD(lr=lrate, momentum=0.9, decay=decay, nesterov=False) - model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) - print(X_train.shape) - # model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, batch_size=32) - # - # scores = model.evaluate(X_test, y_test, verbose=0) - # print("Accuracy: %.2f%%" % (scores[1]*100)) - - + keras.utils.plot_model(model, to_file='model.pdf', show_shapes=True, show_layer_names=True) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/test_inception_model.py b/test/test_inception_model.py new file mode 100644 index 0000000000000000000000000000000000000000..ad18059517aef997021e4b7a538f16e30378b8dd --- /dev/null +++ b/test/test_inception_model.py @@ -0,0 +1,174 @@ +import pytest +from src.inception_model import InceptionModelBase +import keras +import tensorflow as tf + + +class TestInceptionModelBase: + + @pytest.fixture + def base(self): + return InceptionModelBase() + + @pytest.fixture + def input_x(self): + return keras.Input(shape=(32, 32, 3)) + + @staticmethod + def step_in(element, depth=1): + for _ in range(depth): + element = element.input._keras_history[0] + return element + + def test_init(self, base): + assert base.number_of_blocks == 0 + assert base.part_of_block == 0 + assert base.ord_base == 96 + assert base.act_number == 0 + + def test_block_part_name(self, base): + assert base.block_part_name() == chr(96) + base.part_of_block += 1 + assert base.block_part_name() == 'a' + + def test_create_conv_tower_3x3(self, base, input_x): + opts = {'input_x': input_x, 'reduction_filter': 64, 'tower_filter': 32, 'tower_kernel': (3, 3)} + tower = base.create_conv_tower(**opts) + # check last element of tower (activation) + assert base.part_of_block == 1 + assert tower.name == 'Block_0a_act_2/Relu:0' + act_layer = tower._keras_history[0] + assert isinstance(act_layer, keras.layers.advanced_activations.ReLU) + assert act_layer.name == "Block_0a_act_2" + # check previous element of tower (conv2D) + conv_layer = self.step_in(act_layer) + assert isinstance(conv_layer, keras.layers.Conv2D) + assert conv_layer.filters == 32 + assert conv_layer.padding == 'same' + assert conv_layer.kernel_size == (3, 3) + assert conv_layer.strides == (1, 1) + assert conv_layer.name == "Block_0a_3x3" + # check previous element of tower (activation) + act_layer2 = self.step_in(conv_layer) + assert isinstance(act_layer2, keras.layers.advanced_activations.ReLU) + assert act_layer2.name == "Block_0a_act_1" + # check previous element of tower (conv2D) + conv_layer2 = self.step_in(act_layer2) + assert isinstance(conv_layer2, keras.layers.Conv2D) + assert conv_layer2.filters == 64 + assert conv_layer2.kernel_size == (1, 1) + assert conv_layer2.padding == 'same' + assert conv_layer2.name == 'Block_0a_1x1' + assert conv_layer2.input._keras_shape == (None, 32, 32, 3) + + def test_create_conv_tower_3x3_activation(self, base, input_x): + opts = {'input_x': input_x, 'reduction_filter': 64, 'tower_filter': 32, 'tower_kernel': (3, 3)} + # create tower with standard activation function + tower = base.create_conv_tower(activation='tanh', **opts) + assert tower.name == 'Block_0a_act_2_tanh/Tanh:0' + act_layer = tower._keras_history[0] + assert isinstance(act_layer, keras.layers.core.Activation) + assert act_layer.name == "Block_0a_act_2_tanh" + # create tower with activation function class + tower = base.create_conv_tower(activation=keras.layers.LeakyReLU, **opts) + assert tower.name == 'Block_0b_act_2/LeakyRelu:0' + act_layer = tower._keras_history[0] + assert isinstance(act_layer, keras.layers.advanced_activations.LeakyReLU) + assert act_layer.name == "Block_0b_act_2" + + def test_create_conv_tower_1x1(self, base, input_x): + opts = {'input_x': input_x, 'reduction_filter': 64, 'tower_filter': 32, 'tower_kernel': (1, 1)} + tower = base.create_conv_tower(**opts) + # check last element of tower (activation) + assert base.part_of_block == 1 + assert tower.name == 'Block_0a_act_1_1/Relu:0' + act_layer = tower._keras_history[0] + assert isinstance(act_layer, keras.layers.advanced_activations.ReLU) + assert act_layer.name == "Block_0a_act_1" + # check previous element of tower (conv2D) + conv_layer = self.step_in(act_layer) + assert isinstance(conv_layer, keras.layers.Conv2D) + assert conv_layer.filters == 32 + assert conv_layer.padding == 'same' + assert conv_layer.kernel_size == (1, 1) + assert conv_layer.strides == (1, 1) + assert conv_layer.name == "Block_0a_1x1" + assert conv_layer.input._keras_shape == (None, 32, 32, 3) + + def test_create_conv_towers(self, base, input_x): + opts = {'input_x': input_x, 'reduction_filter': 64, 'tower_filter': 32, 'tower_kernel': (3, 3)} + _ = base.create_conv_tower(**opts) + tower = base.create_conv_tower(**opts) + assert base.part_of_block == 2 + assert tower.name == 'Block_0b_act_2_1/Relu:0' + + def test_create_pool_tower(self, base, input_x): + opts = {'input_x': input_x, 'pool_kernel': (3, 3), 'tower_filter': 32} + tower = base.create_pool_tower(**opts) + # check last element of tower (activation) + assert base.part_of_block == 1 + assert tower.name == 'Block_0a_act_1_3/Relu:0' + act_layer = tower._keras_history[0] + assert isinstance(act_layer, keras.layers.advanced_activations.ReLU) + assert act_layer.name == "Block_0a_act_1" + # check previous element of tower (conv2D) + conv_layer = self.step_in(act_layer) + assert isinstance(conv_layer, keras.layers.Conv2D) + assert conv_layer.filters == 32 + assert conv_layer.padding == 'same' + assert conv_layer.kernel_size == (1, 1) + assert conv_layer.strides == (1, 1) + assert conv_layer.name == "Block_0a_1x1" + # check previous element of tower (maxpool) + pool_layer = self.step_in(conv_layer) + assert isinstance(pool_layer, keras.layers.pooling.MaxPooling2D) + assert pool_layer.name == "Block_0a_MaxPool" + assert pool_layer.pool_size == (3, 3) + assert pool_layer.padding == 'same' + # check avg pool tower + opts = {'input_x': input_x, 'pool_kernel': (3, 3), 'tower_filter': 32} + tower = base.create_pool_tower(max_pooling=False, **opts) + pool_layer = self.step_in(tower._keras_history[0], depth=2) + assert isinstance(pool_layer, keras.layers.pooling.AveragePooling2D) + assert pool_layer.name == "Block_0b_AvgPool" + assert pool_layer.pool_size == (3, 3) + assert pool_layer.padding == 'same' + + def test_inception_block(self, base, input_x): + conv = {'tower_1': {'reduction_filter': 64, 'tower_kernel': (3, 3), 'tower_filter': 64}, + 'tower_2': {'reduction_filter': 64, 'tower_kernel': (5, 5), 'tower_filter': 64, 'activation': 'tanh'}} + pool = {'pool_kernel': (3, 3), 'tower_filter': 64} + opts = {'input_x': input_x, 'tower_conv_parts': conv, 'tower_pool_parts': pool} + block = base.inception_block(**opts) + assert base.number_of_blocks == 1 + concatenated = block._keras_history[0].input + assert len(concatenated) == 4 + block_1a, block_1b, block_pool1, block_pool2 = concatenated + assert block_1a.name == 'Block_1a_act_2/Relu:0' + assert block_1b.name == 'Block_1b_act_2_tanh/Tanh:0' + assert block_pool1.name == 'Block_1c_act_1/Relu:0' + assert block_pool2.name == 'Block_1d_act_1/Relu:0' + assert self.step_in(block_1a._keras_history[0]).name == "Block_1a_3x3" + assert self.step_in(block_1b._keras_history[0]).name == "Block_1b_5x5" + assert isinstance(self.step_in(block_pool1._keras_history[0], depth=2), keras.layers.pooling.MaxPooling2D) + assert isinstance(self.step_in(block_pool2._keras_history[0], depth=2), keras.layers.pooling.AveragePooling2D) + # next block + opts['input_x'] = block + opts['tower_pool_parts']['max_pooling'] = True + block = base.inception_block(**opts) + assert base.number_of_blocks == 2 + concatenated = block._keras_history[0].input + assert len(concatenated) == 3 + block_2a, block_2b, block_pool = concatenated + assert block_2a.name == 'Block_2a_act_2/Relu:0' + assert block_2b.name == 'Block_2b_act_2_tanh/Tanh:0' + assert block_pool.name == 'Block_2c_act_1/Relu:0' + assert self.step_in(block_2a._keras_history[0]).name == "Block_2a_3x3" + assert self.step_in(block_2b._keras_history[0]).name == "Block_2b_5x5" + assert isinstance(self.step_in(block_pool._keras_history[0], depth=2), keras.layers.pooling.MaxPooling2D) + + def test_batch_normalisation(self, base, input_x): + base.part_of_block += 1 + bn = base.batch_normalisation(input_x)._keras_history[0] + assert isinstance(bn, keras.layers.normalization.BatchNormalization) + assert bn.name == "Block_0a_BN"