Skip to content
Snippets Groups Projects
Commit 2015c09d authored by leufen1's avatar leufen1
Browse files

first implementation of branched rnn

parent 01043acd
No related branches found
No related tags found
6 merge requests!430update recent developments,!413update release branch,!412Resolve "release v2.0.0",!395Lukas issue362 feat branched rnn,!390Lukas issue362 feat branched rnn,!388Resolve "branched rnn model class"
Pipeline #92671 passed
from functools import partial, reduce
from tensorflow import keras as keras
from mlair import AbstractModelClass
from mlair.helpers import select_from_dict
from mlair.model_modules.loss import var_loss
from mlair.model_modules.recurrent_networks import RNN
class BranchInputRNN(RNN): # pragma: no cover
"""A recurrent neural network with multiple input branches."""
def __init__(self, input_shape, output_shape, *args, **kwargs):
super().__init__([input_shape], output_shape, *args, **kwargs)
# apply to model
# self.set_model()
# self.set_compile_options()
# self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
def set_model(self):
"""
Build the model.
"""
if isinstance(self.layer_configuration, tuple) is True:
n_layer, n_hidden = self.layer_configuration
conf = [n_hidden for _ in range(n_layer)]
else:
assert isinstance(self.layer_configuration, list) is True
conf = self.layer_configuration
x_input = []
x_in = []
for branch in range(len(self._input_shape)):
shape_b = self._input_shape[branch]
x_input_b = keras.layers.Input(shape=shape_b)
x_in_b = keras.layers.Reshape((shape_b[0], reduce((lambda x, y: x * y), shape_b[1:])))(x_input_b)
for layer, n_hidden in enumerate(conf):
return_sequences = (layer < len(conf) - 1)
x_in_b = self.RNN(n_hidden, return_sequences=return_sequences, recurrent_dropout=self.dropout_rnn,
name=f"{self.RNN.__name__}_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.bn is True:
x_in_b = keras.layers.BatchNormalization()(x_in_b)
x_in_b = self.activation_rnn(name=f"{self.activation_rnn_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.dropout is not None:
x_in_b = self.dropout(self.dropout_rate)(x_in_b)
x_in.append(x_in_b)
x_concat = keras.layers.Concatenate()(x_in)
if self.add_dense_layer is True:
if len(self.dense_layer_configuration) == 0:
x_concat = keras.layers.Dense(min(self._output_shape ** 2, conf[-1]), name=f"Dense_{len(conf) + 1}",
kernel_initializer=self.kernel_initializer, )(x_concat)
x_concat = self.activation(name=f"{self.activation_name}_{len(conf) + 1}")(x_concat)
if self.dropout is not None:
x_concat = self.dropout(self.dropout_rate)(x_concat)
else:
for layer, n_hidden in enumerate(self.dense_layer_configuration):
if n_hidden < self._output_shape:
break
x_concat = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}",
kernel_initializer=self.kernel_initializer, )(x_concat)
x_concat = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_concat)
if self.dropout is not None:
x_concat = self.dropout(self.dropout_rate)(x_concat)
x_concat = keras.layers.Dense(self._output_shape)(x_concat)
out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in)
self.model = keras.Model(inputs=x_input, outputs=[out])
print(self.model.summary())
def set_compile_options(self):
self.compile_options = {"loss": [keras.losses.mean_squared_error],
"metrics": ["mse", "mae", var_loss]}
def _update_model_name(self, rnn_type):
# n_input = str(reduce(lambda x, y: x * y, self._input_shape))
n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}"
n_output = str(self._output_shape)
self.model_name = rnn_type.upper()
if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
n_layer, n_hidden = self.layer_configuration
branch = [f"r{n_hidden}" for _ in range(n_layer)]
else:
branch = [f"r{n}" for n in self.layer_configuration]
concat = []
if self.add_dense_layer is True:
if len(self.dense_layer_configuration) == 0:
n_hidden = min(self._output_shape ** 2, int(branch[-1]))
concat.append(f"1x{n_hidden}")
else:
for n_hidden in self.dense_layer_configuration:
if n_hidden < self._output_shape:
break
if len(concat) == 0:
concat.append(f"1x{n_hidden}")
else:
concat.append(str(n_hidden))
self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
class BranchedInputFCN(AbstractModelClass): # pragma: no cover
"""
A fully connected network that uses multiple input branches that are combined by a concatenate layer.
"""
_activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
"sigmoid": partial(keras.layers.Activation, "sigmoid"),
"linear": partial(keras.layers.Activation, "linear"),
"selu": partial(keras.layers.Activation, "selu"),
"prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
"leakyrelu": partial(keras.layers.LeakyReLU)}
_initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
"relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(),
"prelu": keras.initializers.he_normal()}
_optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD}
_regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
_requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
_dropout = {"selu": keras.layers.AlphaDropout}
def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear",
optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None,
batch_normalization=False, **kwargs):
"""
Sets model and loss depending on the given arguments.
:param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables))
:param output_shape: list of output shapes (expect len=1 with shape=(window_forecast))
Customize this FCN model via the following parameters:
:param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu,
leakyrelu. (Default relu)
:param activation_output: same as activation parameter but exclusively applied on output layer only. (Default
linear)
:param optimizer: set optimizer method. Can be either adam or sgd. (Default adam)
:param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each
layer. (Default 1)
:param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10)
:param layer_configuration: alternative formulation of the network's architecture. This will overwrite the
settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the
hidden layer. The number of hidden layers is equal to the total length of this list.
:param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the
network at all. (Default None)
:param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted
between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer
is added if set to false. (Default false)
"""
super().__init__(input_shape, output_shape[0])
# settings
self.activation = self._set_activation(activation)
self.activation_name = activation
self.activation_output = self._set_activation(activation_output)
self.activation_output_name = activation_output
self.optimizer = self._set_optimizer(optimizer, **kwargs)
self.bn = batch_normalization
self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration
self._update_model_name()
self.kernel_initializer = self._initializer.get(activation, "glorot_uniform")
self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs)
self.dropout, self.dropout_rate = self._set_dropout(activation, dropout)
# apply to model
self.set_model()
self.set_compile_options()
self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
def _set_activation(self, activation):
try:
return self._activation.get(activation.lower())
except KeyError:
raise AttributeError(f"Given activation {activation} is not supported in this model class.")
def _set_optimizer(self, optimizer, **kwargs):
try:
opt_name = optimizer.lower()
opt = self._optimizer.get(opt_name)
opt_kwargs = {}
if opt_name == "adam":
opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
elif opt_name == "sgd":
opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
return opt(**opt_kwargs)
except KeyError:
raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
def _set_regularizer(self, regularizer, **kwargs):
if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
return None
try:
reg_name = regularizer.lower()
reg = self._regularizer.get(reg_name)
reg_kwargs = {}
if reg_name in ["l1", "l2"]:
reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
if reg_name in reg_kwargs:
reg_kwargs["l"] = reg_kwargs.pop(reg_name)
elif reg_name == "l1_l2":
reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
return reg(**reg_kwargs)
except KeyError:
raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
def _set_dropout(self, activation, dropout_rate):
if dropout_rate is None:
return None, None
assert 0 <= dropout_rate < 1
return self._dropout.get(activation, keras.layers.Dropout), dropout_rate
def _update_model_name(self):
n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}"
n_output = str(self._output_shape)
if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
n_layer, n_hidden = self.layer_configuration
branch = [f"{n_hidden}" for _ in range(n_layer)]
else:
branch = [f"{n}" for n in self.layer_configuration]
concat = []
n_neurons_concat = int(branch[-1]) * len(self._input_shape)
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
if len(concat) == 0:
concat.append(f"1x{n_neurons}")
else:
concat.append(str(n_neurons))
self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
def set_model(self):
"""
Build the model.
"""
if isinstance(self.layer_configuration, tuple) is True:
n_layer, n_hidden = self.layer_configuration
conf = [n_hidden for _ in range(n_layer)]
else:
assert isinstance(self.layer_configuration, list) is True
conf = self.layer_configuration
x_input = []
x_in = []
for branch in range(len(self._input_shape)):
x_input_b = keras.layers.Input(shape=self._input_shape[branch])
x_input.append(x_input_b)
x_in_b = keras.layers.Flatten()(x_input_b)
for layer, n_hidden in enumerate(conf):
x_in_b = keras.layers.Dense(n_hidden, kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
name=f"Dense_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.bn is True:
x_in_b = keras.layers.BatchNormalization()(x_in_b)
x_in_b = self.activation(name=f"{self.activation_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.dropout is not None:
x_in_b = self.dropout(self.dropout_rate)(x_in_b)
x_in.append(x_in_b)
x_concat = keras.layers.Concatenate()(x_in)
n_neurons_concat = int(conf[-1]) * len(self._input_shape)
layer_concat = 0
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
layer_concat += 1
x_concat = keras.layers.Dense(n_neurons, name=f"Dense_{layer_concat}")(x_concat)
if self.bn is True:
x_concat = keras.layers.BatchNormalization()(x_concat)
x_concat = self.activation(name=f"{self.activation_name}_{layer_concat}")(x_concat)
if self.dropout is not None:
x_concat = self.dropout(self.dropout_rate)(x_concat)
x_concat = keras.layers.Dense(self._output_shape)(x_concat)
out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat)
self.model = keras.Model(inputs=x_input, outputs=[out])
print(self.model.summary())
def set_compile_options(self):
self.compile_options = {"loss": [keras.losses.mean_squared_error],
"metrics": ["mse", "mae", var_loss]}
# self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])],
# "metrics": ["mse", "mae", var_loss]}
\ No newline at end of file
......@@ -190,191 +190,3 @@ class FCN_64_32_16(FCN):
def _update_model_name(self):
self.model_name = "FCN"
super()._update_model_name()
class BranchedInputFCN(AbstractModelClass): # pragma: no cover
"""
A customisable fully connected network (64, 32, 16, window_lead_time), where the last layer is the output layer depending
on the window_lead_time parameter.
"""
_activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
"sigmoid": partial(keras.layers.Activation, "sigmoid"),
"linear": partial(keras.layers.Activation, "linear"),
"selu": partial(keras.layers.Activation, "selu"),
"prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
"leakyrelu": partial(keras.layers.LeakyReLU)}
_initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
"relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(),
"prelu": keras.initializers.he_normal()}
_optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD}
_regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
_requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
_dropout = {"selu": keras.layers.AlphaDropout}
def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear",
optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None,
batch_normalization=False, **kwargs):
"""
Sets model and loss depending on the given arguments.
:param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables))
:param output_shape: list of output shapes (expect len=1 with shape=(window_forecast))
Customize this FCN model via the following parameters:
:param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu,
leakyrelu. (Default relu)
:param activation_output: same as activation parameter but exclusively applied on output layer only. (Default
linear)
:param optimizer: set optimizer method. Can be either adam or sgd. (Default adam)
:param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each
layer. (Default 1)
:param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10)
:param layer_configuration: alternative formulation of the network's architecture. This will overwrite the
settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the
hidden layer. The number of hidden layers is equal to the total length of this list.
:param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the
network at all. (Default None)
:param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted
between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer
is added if set to false. (Default false)
"""
super().__init__(input_shape, output_shape[0])
# settings
self.activation = self._set_activation(activation)
self.activation_name = activation
self.activation_output = self._set_activation(activation_output)
self.activation_output_name = activation_output
self.optimizer = self._set_optimizer(optimizer, **kwargs)
self.bn = batch_normalization
self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration
self._update_model_name()
self.kernel_initializer = self._initializer.get(activation, "glorot_uniform")
self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs)
self.dropout, self.dropout_rate = self._set_dropout(activation, dropout)
# apply to model
self.set_model()
self.set_compile_options()
self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
def _set_activation(self, activation):
try:
return self._activation.get(activation.lower())
except KeyError:
raise AttributeError(f"Given activation {activation} is not supported in this model class.")
def _set_optimizer(self, optimizer, **kwargs):
try:
opt_name = optimizer.lower()
opt = self._optimizer.get(opt_name)
opt_kwargs = {}
if opt_name == "adam":
opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
elif opt_name == "sgd":
opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
return opt(**opt_kwargs)
except KeyError:
raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
def _set_regularizer(self, regularizer, **kwargs):
if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
return None
try:
reg_name = regularizer.lower()
reg = self._regularizer.get(reg_name)
reg_kwargs = {}
if reg_name in ["l1", "l2"]:
reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
if reg_name in reg_kwargs:
reg_kwargs["l"] = reg_kwargs.pop(reg_name)
elif reg_name == "l1_l2":
reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
return reg(**reg_kwargs)
except KeyError:
raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
def _set_dropout(self, activation, dropout_rate):
if dropout_rate is None:
return None, None
assert 0 <= dropout_rate < 1
return self._dropout.get(activation, keras.layers.Dropout), dropout_rate
def _update_model_name(self):
n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}"
n_output = str(self._output_shape)
if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
n_layer, n_hidden = self.layer_configuration
branch = [f"{n_hidden}" for _ in range(n_layer)]
else:
branch = [f"{n}" for n in self.layer_configuration]
concat = []
n_neurons_concat = int(branch[-1]) * len(self._input_shape)
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
if len(concat) == 0:
concat.append(f"1x{n_neurons}")
else:
concat.append(str(n_neurons))
self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
def set_model(self):
"""
Build the model.
"""
if isinstance(self.layer_configuration, tuple) is True:
n_layer, n_hidden = self.layer_configuration
conf = [n_hidden for _ in range(n_layer)]
else:
assert isinstance(self.layer_configuration, list) is True
conf = self.layer_configuration
x_input = []
x_in = []
for branch in range(len(self._input_shape)):
x_input_b = keras.layers.Input(shape=self._input_shape[branch])
x_input.append(x_input_b)
x_in_b = keras.layers.Flatten()(x_input_b)
for layer, n_hidden in enumerate(conf):
x_in_b = keras.layers.Dense(n_hidden, kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
name=f"Dense_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.bn is True:
x_in_b = keras.layers.BatchNormalization()(x_in_b)
x_in_b = self.activation(name=f"{self.activation_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.dropout is not None:
x_in_b = self.dropout(self.dropout_rate)(x_in_b)
x_in.append(x_in_b)
x_concat = keras.layers.Concatenate()(x_in)
n_neurons_concat = int(conf[-1]) * len(self._input_shape)
layer_concat = 0
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
layer_concat += 1
x_concat = keras.layers.Dense(n_neurons, name=f"Dense_{layer_concat}")(x_concat)
if self.bn is True:
x_concat = keras.layers.BatchNormalization()(x_concat)
x_concat = self.activation(name=f"{self.activation_name}_{layer_concat}")(x_concat)
if self.dropout is not None:
x_concat = self.dropout(self.dropout_rate)(x_concat)
x_concat = keras.layers.Dense(self._output_shape)(x_concat)
out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat)
self.model = keras.Model(inputs=x_input, outputs=[out])
print(self.model.summary())
def set_compile_options(self):
self.compile_options = {"loss": [keras.losses.mean_squared_error],
"metrics": ["mse", "mae", var_loss]}
# self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])],
# "metrics": ["mse", "mae", var_loss]}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment