From e3de37e423c5d6e5a106181c2d3ab235fe476b19 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Fri, 28 May 2021 10:07:57 +0200 Subject: [PATCH] new FCN class using branched inputs (can be combined with branched filter data handler) --- .../model_modules/fully_connected_networks.py | 192 +++++++++++++++++- mlair/model_modules/loss.py | 4 +- 2 files changed, 192 insertions(+), 4 deletions(-) diff --git a/mlair/model_modules/fully_connected_networks.py b/mlair/model_modules/fully_connected_networks.py index ff06f075..21455383 100644 --- a/mlair/model_modules/fully_connected_networks.py +++ b/mlair/model_modules/fully_connected_networks.py @@ -5,7 +5,7 @@ from functools import reduce, partial from mlair.model_modules import AbstractModelClass from mlair.helpers import select_from_dict -from mlair.model_modules.loss import var_loss, custom_loss +from mlair.model_modules.loss import var_loss, custom_loss, l_p_loss import keras @@ -79,7 +79,7 @@ class FCN(AbstractModelClass): # apply to model self.set_model() self.set_compile_options() - self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) + self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss, l_p_loss=l_p_loss(.5)) def _set_activation(self, activation): try: @@ -190,3 +190,191 @@ class FCN_64_32_16(FCN): def _update_model_name(self): self.model_name = "FCN" super()._update_model_name() + + +class BranchedInputFCN(AbstractModelClass): + """ + A customisable fully connected network (64, 32, 16, window_lead_time), where the last layer is the output layer depending + on the window_lead_time parameter. + """ + + _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), + "sigmoid": partial(keras.layers.Activation, "sigmoid"), + "linear": partial(keras.layers.Activation, "linear"), + "selu": partial(keras.layers.Activation, "selu"), + "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), + "leakyrelu": partial(keras.layers.LeakyReLU)} + _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", + "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), + "prelu": keras.initializers.he_normal()} + _optimizer = {"adam": keras.optimizers.adam, "sgd": keras.optimizers.SGD} + _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} + _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] + _dropout = {"selu": keras.layers.AlphaDropout} + + def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear", + optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None, + batch_normalization=False, **kwargs): + """ + Sets model and loss depending on the given arguments. + + :param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables)) + :param output_shape: list of output shapes (expect len=1 with shape=(window_forecast)) + + Customize this FCN model via the following parameters: + + :param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu, + leakyrelu. (Default relu) + :param activation_output: same as activation parameter but exclusively applied on output layer only. (Default + linear) + :param optimizer: set optimizer method. Can be either adam or sgd. (Default adam) + :param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each + layer. (Default 1) + :param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10) + :param layer_configuration: alternative formulation of the network's architecture. This will overwrite the + settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the + hidden layer. The number of hidden layers is equal to the total length of this list. + :param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the + network at all. (Default None) + :param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted + between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer + is added if set to false. (Default false) + """ + + super().__init__(input_shape, output_shape[0]) + + # settings + self.activation = self._set_activation(activation) + self.activation_name = activation + self.activation_output = self._set_activation(activation_output) + self.activation_output_name = activation_output + self.optimizer = self._set_optimizer(optimizer, **kwargs) + self.bn = batch_normalization + self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration + self._update_model_name() + self.kernel_initializer = self._initializer.get(activation, "glorot_uniform") + self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs) + self.dropout, self.dropout_rate = self._set_dropout(activation, dropout) + + # apply to model + self.set_model() + self.set_compile_options() + self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) + + def _set_activation(self, activation): + try: + return self._activation.get(activation.lower()) + except KeyError: + raise AttributeError(f"Given activation {activation} is not supported in this model class.") + + def _set_optimizer(self, optimizer, **kwargs): + try: + opt_name = optimizer.lower() + opt = self._optimizer.get(opt_name) + opt_kwargs = {} + if opt_name == "adam": + opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"]) + elif opt_name == "sgd": + opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"]) + return opt(**opt_kwargs) + except KeyError: + raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.") + + def _set_regularizer(self, regularizer, **kwargs): + if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"): + return None + try: + reg_name = regularizer.lower() + reg = self._regularizer.get(reg_name) + reg_kwargs = {} + if reg_name in ["l1", "l2"]: + reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True) + if reg_name in reg_kwargs: + reg_kwargs["l"] = reg_kwargs.pop(reg_name) + elif reg_name == "l1_l2": + reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True) + return reg(**reg_kwargs) + except KeyError: + raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") + + def _set_dropout(self, activation, dropout_rate): + if dropout_rate is None: + return None, None + assert 0 <= dropout_rate < 1 + return self._dropout.get(activation, keras.layers.Dropout), dropout_rate + + def _update_model_name(self): + n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}" + n_output = str(self._output_shape) + + if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2: + n_layer, n_hidden = self.layer_configuration + branch = [f"{n_hidden}" for _ in range(n_layer)] + else: + branch = [f"{n}" for n in self.layer_configuration] + + concat = [] + n_neurons_concat = int(branch[-1]) * len(self._input_shape) + for exp in reversed(range(2, len(self._input_shape) + 1)): + n_neurons = self._output_shape ** exp + if n_neurons < n_neurons_concat: + if len(concat) == 0: + concat.append(f"1x{n_neurons}") + else: + concat.append(str(n_neurons)) + self.model_name += "_".join(["", n_input, *branch, *concat, n_output]) + + def set_model(self): + """ + Build the model. + """ + + if isinstance(self.layer_configuration, tuple) is True: + n_layer, n_hidden = self.layer_configuration + conf = [n_hidden for _ in range(n_layer)] + else: + assert isinstance(self.layer_configuration, list) is True + conf = self.layer_configuration + + x_input = [] + x_in = [] + + for branch in range(len(self._input_shape)): + x_input_b = keras.layers.Input(shape=self._input_shape[branch]) + x_input.append(x_input_b) + x_in_b = keras.layers.Flatten()(x_input_b) + + for layer, n_hidden in enumerate(conf): + x_in_b = keras.layers.Dense(n_hidden, kernel_initializer=self.kernel_initializer, + kernel_regularizer=self.kernel_regularizer, + name=f"Dense_branch{branch + 1}_{layer + 1}")(x_in_b) + if self.bn is True: + x_in_b = keras.layers.BatchNormalization()(x_in_b) + x_in_b = self.activation(name=f"{self.activation_name}_branch{branch + 1}_{layer + 1}")(x_in_b) + if self.dropout is not None: + x_in_b = self.dropout(self.dropout_rate)(x_in_b) + x_in.append(x_in_b) + x_concat = keras.layers.Concatenate()(x_in) + + n_neurons_concat = int(conf[-1]) * len(self._input_shape) + layer_concat = 0 + for exp in reversed(range(2, len(self._input_shape) + 1)): + n_neurons = self._output_shape ** exp + if n_neurons < n_neurons_concat: + layer_concat += 1 + x_concat = keras.layers.Dense(n_neurons, name=f"Dense_{layer_concat}")(x_concat) + if self.bn is True: + x_concat = keras.layers.BatchNormalization()(x_concat) + x_concat = self.activation(name=f"{self.activation_name}_{layer_concat}")(x_concat) + if self.dropout is not None: + x_concat = self.dropout(self.dropout_rate)(x_concat) + x_concat = keras.layers.Dense(self._output_shape)(x_concat) + out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat) + self.model = keras.Model(inputs=x_input, outputs=[out]) + print(self.model.summary()) + + def set_compile_options(self): + # self.compile_options = {"loss": [keras.losses.mean_squared_error], + # "metrics": ["mse", "mae", var_loss]} + self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])], + "metrics": ["mse", "mae", var_loss]} diff --git a/mlair/model_modules/loss.py b/mlair/model_modules/loss.py index ba871e98..2034c5a7 100644 --- a/mlair/model_modules/loss.py +++ b/mlair/model_modules/loss.py @@ -16,10 +16,10 @@ def l_p_loss(power: int) -> Callable: :return: loss for given power """ - def loss(y_true, y_pred): + def l_p_loss(y_true, y_pred): return K.mean(K.pow(K.abs(y_pred - y_true), power), axis=-1) - return loss + return l_p_loss def var_loss(y_true, y_pred) -> Callable: -- GitLab