Skip to content
Snippets Groups Projects
Select Git revision
  • f65a442e73ba6595e7ed30b7b20a8bdca53e8c9a
  • master default protected
  • enxhi_issue460_remove_TOAR-I_access
  • michael_issue459_preprocess_german_stations
  • sh_pollutants
  • develop protected
  • release_v2.4.0
  • michael_issue450_feat_load-ifs-data
  • lukas_issue457_feat_set-config-paths-as-parameter
  • lukas_issue454_feat_use-toar-statistics-api-v2
  • lukas_issue453_refac_advanced-retry-strategy
  • lukas_issue452_bug_update-proj-version
  • lukas_issue449_refac_load-era5-data-from-toar-db
  • lukas_issue451_feat_robust-apriori-estimate-for-short-timeseries
  • lukas_issue448_feat_load-model-from-path
  • lukas_issue447_feat_store-and-load-local-clim-apriori-data
  • lukas_issue445_feat_data-insight-plot-monthly-distribution
  • lukas_issue442_feat_bias-free-evaluation
  • lukas_issue444_feat_choose-interp-method-cams
  • 414-include-crps-analysis-and-other-ens-verif-methods-or-plots
  • lukas_issue384_feat_aqw-data-handler
  • v2.4.0 protected
  • v2.3.0 protected
  • v2.2.0 protected
  • v2.1.0 protected
  • Kleinert_etal_2022_initial_submission
  • v2.0.0 protected
  • v1.5.0 protected
  • v1.4.0 protected
  • v1.3.0 protected
  • v1.2.1 protected
  • v1.2.0 protected
  • v1.1.0 protected
  • IntelliO3-ts-v1.0_R1-submit
  • v1.0.0 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • IntelliO3-ts-v1.0_initial-submit
41 results

join.py

Blame
  • branched_input_networks.py 14.76 KiB
    from functools import partial, reduce
    
    from tensorflow import keras as keras
    
    from mlair import AbstractModelClass
    from mlair.helpers import select_from_dict
    from mlair.model_modules.loss import var_loss
    from mlair.model_modules.recurrent_networks import RNN
    
    
    class BranchedInputRNN(RNN):  # pragma: no cover
        """A recurrent neural network with multiple input branches."""
    
        def __init__(self, input_shape, output_shape, *args, **kwargs):
    
            super().__init__([input_shape], output_shape, *args, **kwargs)
    
            # apply to model
            # self.set_model()
            # self.set_compile_options()
            # self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
    
        def set_model(self):
            """
            Build the model.
            """
            if isinstance(self.layer_configuration, tuple) is True:
                n_layer, n_hidden = self.layer_configuration
                conf = [n_hidden for _ in range(n_layer)]
            else:
                assert isinstance(self.layer_configuration, list) is True
                conf = self.layer_configuration
    
            x_input = []
            x_in = []
    
            for branch in range(len(self._input_shape)):
                shape_b = self._input_shape[branch]
                x_input_b = keras.layers.Input(shape=shape_b)
                x_input.append(x_input_b)
                x_in_b = keras.layers.Reshape((shape_b[0], reduce((lambda x, y: x * y), shape_b[1:])),
                                              name=f"reshape_branch{branch + 1}")(x_input_b)
    
                for layer, n_hidden in enumerate(conf):
                    return_sequences = (layer < len(conf) - 1)
                    x_in_b = self.RNN(n_hidden, return_sequences=return_sequences, recurrent_dropout=self.dropout_rnn,
                                      name=f"{self.RNN.__name__}_branch{branch + 1}_{layer + 1}")(x_in_b)
                    if self.bn is True:
                        x_in_b = keras.layers.BatchNormalization()(x_in_b)
                    x_in_b = self.activation_rnn(name=f"{self.activation_rnn_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
                    if self.dropout is not None:
                        x_in_b = self.dropout(self.dropout_rate)(x_in_b)
                x_in.append(x_in_b)
            x_concat = keras.layers.Concatenate()(x_in)
    
            if self.add_dense_layer is True:
                if len(self.dense_layer_configuration) == 0:
                    x_concat = keras.layers.Dense(min(self._output_shape ** 2, conf[-1]), name=f"Dense_{len(conf) + 1}",
                                                  kernel_initializer=self.kernel_initializer, )(x_concat)
                    x_concat = self.activation(name=f"{self.activation_name}_{len(conf) + 1}")(x_concat)
                    if self.dropout is not None:
                        x_concat = self.dropout(self.dropout_rate)(x_concat)
                else:
                    for layer, n_hidden in enumerate(self.dense_layer_configuration):
                        if n_hidden < self._output_shape:
                            break
                        x_concat = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}",
                                                      kernel_initializer=self.kernel_initializer, )(x_concat)
                        x_concat = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_concat)
                        if self.dropout is not None:
                            x_concat = self.dropout(self.dropout_rate)(x_concat)
    
            x_concat = keras.layers.Dense(self._output_shape)(x_concat)
            out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat)
            self.model = keras.Model(inputs=x_input, outputs=[out])
            print(self.model.summary())
    
        def set_compile_options(self):
            self.compile_options = {"loss": [keras.losses.mean_squared_error],
                                    "metrics": ["mse", "mae", var_loss]}
    
        def _update_model_name(self, rnn_type):
            n_input = f"{len(self._input_shape)}x{self._input_shape[0][0]}x" \
                      f"{str(reduce(lambda x, y: x * y, self._input_shape[0][1:]))}"
            n_output = str(self._output_shape)
            self.model_name = rnn_type.upper()
            if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
                n_layer, n_hidden = self.layer_configuration
                branch = [f"r{n_hidden}" for _ in range(n_layer)]
            else:
                branch = [f"r{n}" for n in self.layer_configuration]
    
            concat = []
            if self.add_dense_layer is True:
                if len(self.dense_layer_configuration) == 0:
                    n_hidden = min(self._output_shape ** 2, int(branch[-1]))
                    concat.append(f"1x{n_hidden}")
                else:
                    for n_hidden in self.dense_layer_configuration:
                        if n_hidden < self._output_shape:
                            break
                        if len(concat) == 0:
                            concat.append(f"1x{n_hidden}")
                        else:
                            concat.append(str(n_hidden))
            self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
    
    
    class BranchedInputFCN(AbstractModelClass):  # pragma: no cover
        """
        A fully connected network that uses multiple input branches that are combined by a concatenate layer.
        """
    
        _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
                       "sigmoid": partial(keras.layers.Activation, "sigmoid"),
                       "linear": partial(keras.layers.Activation, "linear"),
                       "selu": partial(keras.layers.Activation, "selu"),
                       "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
                       "leakyrelu": partial(keras.layers.LeakyReLU)}
        _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
                        "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(),
                        "prelu": keras.initializers.he_normal()}
        _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD}
        _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
        _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
        _dropout = {"selu": keras.layers.AlphaDropout}
    
        def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear",
                     optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None,
                     batch_normalization=False, **kwargs):
            """
            Sets model and loss depending on the given arguments.
    
            :param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables))
            :param output_shape: list of output shapes (expect len=1 with shape=(window_forecast))
    
            Customize this FCN model via the following parameters:
    
            :param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu,
                leakyrelu. (Default relu)
            :param activation_output: same as activation parameter but exclusively applied on output layer only. (Default
                linear)
            :param optimizer: set optimizer method. Can be either adam or sgd. (Default adam)
            :param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each
                layer. (Default 1)
            :param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10)
            :param layer_configuration: alternative formulation of the network's architecture. This will overwrite the
                settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the
                hidden layer. The number of hidden layers is equal to the total length of this list.
            :param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the
                network at all. (Default None)
            :param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted
                between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer
                is added if set to false. (Default false)
            """
    
            super().__init__(input_shape, output_shape[0])
    
            # settings
            self.activation = self._set_activation(activation)
            self.activation_name = activation
            self.activation_output = self._set_activation(activation_output)
            self.activation_output_name = activation_output
            self.optimizer = self._set_optimizer(optimizer, **kwargs)
            self.bn = batch_normalization
            self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration
            self._update_model_name()
            self.kernel_initializer = self._initializer.get(activation, "glorot_uniform")
            self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs)
            self.dropout, self.dropout_rate = self._set_dropout(activation, dropout)
    
            # apply to model
            self.set_model()
            self.set_compile_options()
            self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
    
        def _set_activation(self, activation):
            try:
                return self._activation.get(activation.lower())
            except KeyError:
                raise AttributeError(f"Given activation {activation} is not supported in this model class.")
    
        def _set_optimizer(self, optimizer, **kwargs):
            try:
                opt_name = optimizer.lower()
                opt = self._optimizer.get(opt_name)
                opt_kwargs = {}
                if opt_name == "adam":
                    opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
                elif opt_name == "sgd":
                    opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
                return opt(**opt_kwargs)
            except KeyError:
                raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
    
        def _set_regularizer(self, regularizer, **kwargs):
            if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
                return None
            try:
                reg_name = regularizer.lower()
                reg = self._regularizer.get(reg_name)
                reg_kwargs = {}
                if reg_name in ["l1", "l2"]:
                    reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
                    if reg_name in reg_kwargs:
                        reg_kwargs["l"] = reg_kwargs.pop(reg_name)
                elif reg_name == "l1_l2":
                    reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
                return reg(**reg_kwargs)
            except KeyError:
                raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
    
        def _set_dropout(self, activation, dropout_rate):
            if dropout_rate is None:
                return None, None
            assert 0 <= dropout_rate < 1
            return self._dropout.get(activation, keras.layers.Dropout), dropout_rate
    
        def _update_model_name(self):
            n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}"
            n_output = str(self._output_shape)
    
            if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
                n_layer, n_hidden = self.layer_configuration
                branch = [f"{n_hidden}" for _ in range(n_layer)]
            else:
                branch = [f"{n}" for n in self.layer_configuration]
    
            concat = []
            n_neurons_concat = int(branch[-1]) * len(self._input_shape)
            for exp in reversed(range(2, len(self._input_shape) + 1)):
                n_neurons = self._output_shape ** exp
                if n_neurons < n_neurons_concat:
                    if len(concat) == 0:
                        concat.append(f"1x{n_neurons}")
                    else:
                        concat.append(str(n_neurons))
            self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
    
        def set_model(self):
            """
            Build the model.
            """
    
            if isinstance(self.layer_configuration, tuple) is True:
                n_layer, n_hidden = self.layer_configuration
                conf = [n_hidden for _ in range(n_layer)]
            else:
                assert isinstance(self.layer_configuration, list) is True
                conf = self.layer_configuration
    
            x_input = []
            x_in = []
    
            for branch in range(len(self._input_shape)):
                x_input_b = keras.layers.Input(shape=self._input_shape[branch])
                x_input.append(x_input_b)
                x_in_b = keras.layers.Flatten()(x_input_b)
    
                for layer, n_hidden in enumerate(conf):
                    x_in_b = keras.layers.Dense(n_hidden, kernel_initializer=self.kernel_initializer,
                                                kernel_regularizer=self.kernel_regularizer,
                                                name=f"Dense_branch{branch + 1}_{layer + 1}")(x_in_b)
                    if self.bn is True:
                        x_in_b = keras.layers.BatchNormalization()(x_in_b)
                    x_in_b = self.activation(name=f"{self.activation_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
                    if self.dropout is not None:
                        x_in_b = self.dropout(self.dropout_rate)(x_in_b)
                x_in.append(x_in_b)
            x_concat = keras.layers.Concatenate()(x_in)
    
            n_neurons_concat = int(conf[-1]) * len(self._input_shape)
            layer_concat = 0
            for exp in reversed(range(2, len(self._input_shape) + 1)):
                n_neurons = self._output_shape ** exp
                if n_neurons < n_neurons_concat:
                    layer_concat += 1
                    x_concat = keras.layers.Dense(n_neurons, name=f"Dense_{layer_concat}")(x_concat)
                    if self.bn is True:
                        x_concat = keras.layers.BatchNormalization()(x_concat)
                    x_concat = self.activation(name=f"{self.activation_name}_{layer_concat}")(x_concat)
                    if self.dropout is not None:
                        x_concat = self.dropout(self.dropout_rate)(x_concat)
            x_concat = keras.layers.Dense(self._output_shape)(x_concat)
            out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat)
            self.model = keras.Model(inputs=x_input, outputs=[out])
            print(self.model.summary())
    
        def set_compile_options(self):
            self.compile_options = {"loss": [keras.losses.mean_squared_error],
                                    "metrics": ["mse", "mae", var_loss]}
            # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])],
            #                         "metrics": ["mse", "mae", var_loss]}