Skip to content
Snippets Groups Projects

Resolve "release v1.4.0"

Merged Ghost User requested to merge release_v1.4.0 into master
2 files
+ 192
4
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -5,7 +5,7 @@ from functools import reduce, partial
from mlair.model_modules import AbstractModelClass
from mlair.helpers import select_from_dict
from mlair.model_modules.loss import var_loss, custom_loss
from mlair.model_modules.loss import var_loss, custom_loss, l_p_loss
import keras
@@ -79,7 +79,7 @@ class FCN(AbstractModelClass):
# apply to model
self.set_model()
self.set_compile_options()
self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss, l_p_loss=l_p_loss(.5))
def _set_activation(self, activation):
try:
@@ -190,3 +190,191 @@ class FCN_64_32_16(FCN):
def _update_model_name(self):
self.model_name = "FCN"
super()._update_model_name()
class BranchedInputFCN(AbstractModelClass):
"""
A customisable fully connected network (64, 32, 16, window_lead_time), where the last layer is the output layer depending
on the window_lead_time parameter.
"""
_activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
"sigmoid": partial(keras.layers.Activation, "sigmoid"),
"linear": partial(keras.layers.Activation, "linear"),
"selu": partial(keras.layers.Activation, "selu"),
"prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
"leakyrelu": partial(keras.layers.LeakyReLU)}
_initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
"relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(),
"prelu": keras.initializers.he_normal()}
_optimizer = {"adam": keras.optimizers.adam, "sgd": keras.optimizers.SGD}
_regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
_requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
_dropout = {"selu": keras.layers.AlphaDropout}
def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear",
optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None,
batch_normalization=False, **kwargs):
"""
Sets model and loss depending on the given arguments.
:param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables))
:param output_shape: list of output shapes (expect len=1 with shape=(window_forecast))
Customize this FCN model via the following parameters:
:param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu,
leakyrelu. (Default relu)
:param activation_output: same as activation parameter but exclusively applied on output layer only. (Default
linear)
:param optimizer: set optimizer method. Can be either adam or sgd. (Default adam)
:param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each
layer. (Default 1)
:param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10)
:param layer_configuration: alternative formulation of the network's architecture. This will overwrite the
settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the
hidden layer. The number of hidden layers is equal to the total length of this list.
:param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the
network at all. (Default None)
:param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted
between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer
is added if set to false. (Default false)
"""
super().__init__(input_shape, output_shape[0])
# settings
self.activation = self._set_activation(activation)
self.activation_name = activation
self.activation_output = self._set_activation(activation_output)
self.activation_output_name = activation_output
self.optimizer = self._set_optimizer(optimizer, **kwargs)
self.bn = batch_normalization
self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration
self._update_model_name()
self.kernel_initializer = self._initializer.get(activation, "glorot_uniform")
self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs)
self.dropout, self.dropout_rate = self._set_dropout(activation, dropout)
# apply to model
self.set_model()
self.set_compile_options()
self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
def _set_activation(self, activation):
try:
return self._activation.get(activation.lower())
except KeyError:
raise AttributeError(f"Given activation {activation} is not supported in this model class.")
def _set_optimizer(self, optimizer, **kwargs):
try:
opt_name = optimizer.lower()
opt = self._optimizer.get(opt_name)
opt_kwargs = {}
if opt_name == "adam":
opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
elif opt_name == "sgd":
opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
return opt(**opt_kwargs)
except KeyError:
raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
def _set_regularizer(self, regularizer, **kwargs):
if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
return None
try:
reg_name = regularizer.lower()
reg = self._regularizer.get(reg_name)
reg_kwargs = {}
if reg_name in ["l1", "l2"]:
reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
if reg_name in reg_kwargs:
reg_kwargs["l"] = reg_kwargs.pop(reg_name)
elif reg_name == "l1_l2":
reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
return reg(**reg_kwargs)
except KeyError:
raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
def _set_dropout(self, activation, dropout_rate):
if dropout_rate is None:
return None, None
assert 0 <= dropout_rate < 1
return self._dropout.get(activation, keras.layers.Dropout), dropout_rate
def _update_model_name(self):
n_input = f"{len(self._input_shape)}x{str(reduce(lambda x, y: x * y, self._input_shape[0]))}"
n_output = str(self._output_shape)
if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2:
n_layer, n_hidden = self.layer_configuration
branch = [f"{n_hidden}" for _ in range(n_layer)]
else:
branch = [f"{n}" for n in self.layer_configuration]
concat = []
n_neurons_concat = int(branch[-1]) * len(self._input_shape)
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
if len(concat) == 0:
concat.append(f"1x{n_neurons}")
else:
concat.append(str(n_neurons))
self.model_name += "_".join(["", n_input, *branch, *concat, n_output])
def set_model(self):
"""
Build the model.
"""
if isinstance(self.layer_configuration, tuple) is True:
n_layer, n_hidden = self.layer_configuration
conf = [n_hidden for _ in range(n_layer)]
else:
assert isinstance(self.layer_configuration, list) is True
conf = self.layer_configuration
x_input = []
x_in = []
for branch in range(len(self._input_shape)):
x_input_b = keras.layers.Input(shape=self._input_shape[branch])
x_input.append(x_input_b)
x_in_b = keras.layers.Flatten()(x_input_b)
for layer, n_hidden in enumerate(conf):
x_in_b = keras.layers.Dense(n_hidden, kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
name=f"Dense_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.bn is True:
x_in_b = keras.layers.BatchNormalization()(x_in_b)
x_in_b = self.activation(name=f"{self.activation_name}_branch{branch + 1}_{layer + 1}")(x_in_b)
if self.dropout is not None:
x_in_b = self.dropout(self.dropout_rate)(x_in_b)
x_in.append(x_in_b)
x_concat = keras.layers.Concatenate()(x_in)
n_neurons_concat = int(conf[-1]) * len(self._input_shape)
layer_concat = 0
for exp in reversed(range(2, len(self._input_shape) + 1)):
n_neurons = self._output_shape ** exp
if n_neurons < n_neurons_concat:
layer_concat += 1
x_concat = keras.layers.Dense(n_neurons, name=f"Dense_{layer_concat}")(x_concat)
if self.bn is True:
x_concat = keras.layers.BatchNormalization()(x_concat)
x_concat = self.activation(name=f"{self.activation_name}_{layer_concat}")(x_concat)
if self.dropout is not None:
x_concat = self.dropout(self.dropout_rate)(x_concat)
x_concat = keras.layers.Dense(self._output_shape)(x_concat)
out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat)
self.model = keras.Model(inputs=x_input, outputs=[out])
print(self.model.summary())
def set_compile_options(self):
# self.compile_options = {"loss": [keras.losses.mean_squared_error],
# "metrics": ["mse", "mae", var_loss]}
self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss], loss_weights=[2, 1])],
"metrics": ["mse", "mae", var_loss]}
Loading