Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
MLAir
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
esde
machine-learning
MLAir
Merge requests
!318
Resolve "release v1.4.0"
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Resolve "release v1.4.0"
release_v1.4.0
into
master
Overview
0
Commits
229
Pipelines
3
Changes
2
Merged
Ghost User
requested to merge
release_v1.4.0
into
master
3 years ago
Overview
0
Commits
229
Pipelines
3
Changes
2
Expand
Closes
#317 (closed)
Edited
3 years ago
by
Ghost User
0
0
Merge request reports
Viewing commit
e3de37e4
Prev
Next
Show latest version
2 files
+
192
−
4
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
2
Search (e.g. *.vue) (Ctrl+P)
e3de37e4
new FCN class using branched inputs (can be combined with branched filter data handler)
· e3de37e4
leufen1
authored
3 years ago
mlair/model_modules/fully_connected_networks.py
+
190
−
2
Options
@@ -5,7 +5,7 @@ from functools import reduce, partial
from
mlair.model_modules
import
AbstractModelClass
from
mlair.helpers
import
select_from_dict
from
mlair.model_modules.loss
import
var_loss
,
custom_loss
from
mlair.model_modules.loss
import
var_loss
,
custom_loss
,
l_p_loss
import
keras
@@ -79,7 +79,7 @@ class FCN(AbstractModelClass):
# apply to model
self
.
set_model
()
self
.
set_compile_options
()
self
.
set_custom_objects
(
loss
=
self
.
compile_options
[
"
loss
"
][
0
],
var_loss
=
var_loss
)
self
.
set_custom_objects
(
loss
=
self
.
compile_options
[
"
loss
"
][
0
],
var_loss
=
var_loss
,
l_p_loss
=
l_p_loss
(.
5
)
)
def
_set_activation
(
self
,
activation
):
try
:
@@ -190,3 +190,191 @@ class FCN_64_32_16(FCN):
def
_update_model_name
(
self
):
self
.
model_name
=
"
FCN
"
super
().
_update_model_name
()
class
BranchedInputFCN
(
AbstractModelClass
):
"""
A customisable fully connected network (64, 32, 16, window_lead_time), where the last layer is the output layer depending
on the window_lead_time parameter.
"""
_activation
=
{
"
relu
"
:
keras
.
layers
.
ReLU
,
"
tanh
"
:
partial
(
keras
.
layers
.
Activation
,
"
tanh
"
),
"
sigmoid
"
:
partial
(
keras
.
layers
.
Activation
,
"
sigmoid
"
),
"
linear
"
:
partial
(
keras
.
layers
.
Activation
,
"
linear
"
),
"
selu
"
:
partial
(
keras
.
layers
.
Activation
,
"
selu
"
),
"
prelu
"
:
partial
(
keras
.
layers
.
PReLU
,
alpha_initializer
=
keras
.
initializers
.
constant
(
value
=
0.25
)),
"
leakyrelu
"
:
partial
(
keras
.
layers
.
LeakyReLU
)}
_initializer
=
{
"
tanh
"
:
"
glorot_uniform
"
,
"
sigmoid
"
:
"
glorot_uniform
"
,
"
linear
"
:
"
glorot_uniform
"
,
"
relu
"
:
keras
.
initializers
.
he_normal
(),
"
selu
"
:
keras
.
initializers
.
lecun_normal
(),
"
prelu
"
:
keras
.
initializers
.
he_normal
()}
_optimizer
=
{
"
adam
"
:
keras
.
optimizers
.
adam
,
"
sgd
"
:
keras
.
optimizers
.
SGD
}
_regularizer
=
{
"
l1
"
:
keras
.
regularizers
.
l1
,
"
l2
"
:
keras
.
regularizers
.
l2
,
"
l1_l2
"
:
keras
.
regularizers
.
l1_l2
}
_requirements
=
[
"
lr
"
,
"
beta_1
"
,
"
beta_2
"
,
"
epsilon
"
,
"
decay
"
,
"
amsgrad
"
,
"
momentum
"
,
"
nesterov
"
,
"
l1
"
,
"
l2
"
]
_dropout
=
{
"
selu
"
:
keras
.
layers
.
AlphaDropout
}
def
__init__
(
self
,
input_shape
:
list
,
output_shape
:
list
,
activation
=
"
relu
"
,
activation_output
=
"
linear
"
,
optimizer
=
"
adam
"
,
n_layer
=
1
,
n_hidden
=
10
,
regularizer
=
None
,
dropout
=
None
,
layer_configuration
=
None
,
batch_normalization
=
False
,
**
kwargs
):
"""
Sets model and loss depending on the given arguments.
:param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables))
:param output_shape: list of output shapes (expect len=1 with shape=(window_forecast))
Customize this FCN model via the following parameters:
:param activation: set your desired activation function. Chose from relu, tanh, sigmoid, linear, selu, prelu,
leakyrelu. (Default relu)
:param activation_output: same as activation parameter but exclusively applied on output layer only. (Default
linear)
:param optimizer: set optimizer method. Can be either adam or sgd. (Default adam)
:param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each
layer. (Default 1)
:param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10)
:param layer_configuration: alternative formulation of the network
'
s architecture. This will overwrite the
settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the
hidden layer. The number of hidden layers is equal to the total length of this list.
:param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the
network at all. (Default None)
:param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted
between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer
is added if set to false. (Default false)
"""
super
().
__init__
(
input_shape
,
output_shape
[
0
])
# settings
self
.
activation
=
self
.
_set_activation
(
activation
)
self
.
activation_name
=
activation
self
.
activation_output
=
self
.
_set_activation
(
activation_output
)
self
.
activation_output_name
=
activation_output
self
.
optimizer
=
self
.
_set_optimizer
(
optimizer
,
**
kwargs
)
self
.
bn
=
batch_normalization
self
.
layer_configuration
=
(
n_layer
,
n_hidden
)
if
layer_configuration
is
None
else
layer_configuration
self
.
_update_model_name
()
self
.
kernel_initializer
=
self
.
_initializer
.
get
(
activation
,
"
glorot_uniform
"
)
self
.
kernel_regularizer
=
self
.
_set_regularizer
(
regularizer
,
**
kwargs
)
self
.
dropout
,
self
.
dropout_rate
=
self
.
_set_dropout
(
activation
,
dropout
)
# apply to model
self
.
set_model
()
self
.
set_compile_options
()
self
.
set_custom_objects
(
loss
=
self
.
compile_options
[
"
loss
"
][
0
],
var_loss
=
var_loss
)
def
_set_activation
(
self
,
activation
):
try
:
return
self
.
_activation
.
get
(
activation
.
lower
())
except
KeyError
:
raise
AttributeError
(
f
"
Given activation
{
activation
}
is not supported in this model class.
"
)
def
_set_optimizer
(
self
,
optimizer
,
**
kwargs
):
try
:
opt_name
=
optimizer
.
lower
()
opt
=
self
.
_optimizer
.
get
(
opt_name
)
opt_kwargs
=
{}
if
opt_name
==
"
adam
"
:
opt_kwargs
=
select_from_dict
(
kwargs
,
[
"
lr
"
,
"
beta_1
"
,
"
beta_2
"
,
"
epsilon
"
,
"
decay
"
,
"
amsgrad
"
])
elif
opt_name
==
"
sgd
"
:
opt_kwargs
=
select_from_dict
(
kwargs
,
[
"
lr
"
,
"
momentum
"
,
"
decay
"
,
"
nesterov
"
])
return
opt
(
**
opt_kwargs
)
except
KeyError
:
raise
AttributeError
(
f
"
Given optimizer
{
optimizer
}
is not supported in this model class.
"
)
def
_set_regularizer
(
self
,
regularizer
,
**
kwargs
):
if
regularizer
is
None
or
(
isinstance
(
regularizer
,
str
)
and
regularizer
.
lower
()
==
"
none
"
):
return
None
try
:
reg_name
=
regularizer
.
lower
()
reg
=
self
.
_regularizer
.
get
(
reg_name
)
reg_kwargs
=
{}
if
reg_name
in
[
"
l1
"
,
"
l2
"
]:
reg_kwargs
=
select_from_dict
(
kwargs
,
reg_name
,
remove_none
=
True
)
if
reg_name
in
reg_kwargs
:
reg_kwargs
[
"
l
"
]
=
reg_kwargs
.
pop
(
reg_name
)
elif
reg_name
==
"
l1_l2
"
:
reg_kwargs
=
select_from_dict
(
kwargs
,
[
"
l1
"
,
"
l2
"
],
remove_none
=
True
)
return
reg
(
**
reg_kwargs
)
except
KeyError
:
raise
AttributeError
(
f
"
Given regularizer
{
regularizer
}
is not supported in this model class.
"
)
def
_set_dropout
(
self
,
activation
,
dropout_rate
):
if
dropout_rate
is
None
:
return
None
,
None
assert
0
<=
dropout_rate
<
1
return
self
.
_dropout
.
get
(
activation
,
keras
.
layers
.
Dropout
),
dropout_rate
def
_update_model_name
(
self
):
n_input
=
f
"
{
len
(
self
.
_input_shape
)
}
x
{
str
(
reduce
(
lambda
x
,
y
:
x
*
y
,
self
.
_input_shape
[
0
]))
}
"
n_output
=
str
(
self
.
_output_shape
)
if
isinstance
(
self
.
layer_configuration
,
tuple
)
and
len
(
self
.
layer_configuration
)
==
2
:
n_layer
,
n_hidden
=
self
.
layer_configuration
branch
=
[
f
"
{
n_hidden
}
"
for
_
in
range
(
n_layer
)]
else
:
branch
=
[
f
"
{
n
}
"
for
n
in
self
.
layer_configuration
]
concat
=
[]
n_neurons_concat
=
int
(
branch
[
-
1
])
*
len
(
self
.
_input_shape
)
for
exp
in
reversed
(
range
(
2
,
len
(
self
.
_input_shape
)
+
1
)):
n_neurons
=
self
.
_output_shape
**
exp
if
n_neurons
<
n_neurons_concat
:
if
len
(
concat
)
==
0
:
concat
.
append
(
f
"
1x
{
n_neurons
}
"
)
else
:
concat
.
append
(
str
(
n_neurons
))
self
.
model_name
+=
"
_
"
.
join
([
""
,
n_input
,
*
branch
,
*
concat
,
n_output
])
def
set_model
(
self
):
"""
Build the model.
"""
if
isinstance
(
self
.
layer_configuration
,
tuple
)
is
True
:
n_layer
,
n_hidden
=
self
.
layer_configuration
conf
=
[
n_hidden
for
_
in
range
(
n_layer
)]
else
:
assert
isinstance
(
self
.
layer_configuration
,
list
)
is
True
conf
=
self
.
layer_configuration
x_input
=
[]
x_in
=
[]
for
branch
in
range
(
len
(
self
.
_input_shape
)):
x_input_b
=
keras
.
layers
.
Input
(
shape
=
self
.
_input_shape
[
branch
])
x_input
.
append
(
x_input_b
)
x_in_b
=
keras
.
layers
.
Flatten
()(
x_input_b
)
for
layer
,
n_hidden
in
enumerate
(
conf
):
x_in_b
=
keras
.
layers
.
Dense
(
n_hidden
,
kernel_initializer
=
self
.
kernel_initializer
,
kernel_regularizer
=
self
.
kernel_regularizer
,
name
=
f
"
Dense_branch
{
branch
+
1
}
_
{
layer
+
1
}
"
)(
x_in_b
)
if
self
.
bn
is
True
:
x_in_b
=
keras
.
layers
.
BatchNormalization
()(
x_in_b
)
x_in_b
=
self
.
activation
(
name
=
f
"
{
self
.
activation_name
}
_branch
{
branch
+
1
}
_
{
layer
+
1
}
"
)(
x_in_b
)
if
self
.
dropout
is
not
None
:
x_in_b
=
self
.
dropout
(
self
.
dropout_rate
)(
x_in_b
)
x_in
.
append
(
x_in_b
)
x_concat
=
keras
.
layers
.
Concatenate
()(
x_in
)
n_neurons_concat
=
int
(
conf
[
-
1
])
*
len
(
self
.
_input_shape
)
layer_concat
=
0
for
exp
in
reversed
(
range
(
2
,
len
(
self
.
_input_shape
)
+
1
)):
n_neurons
=
self
.
_output_shape
**
exp
if
n_neurons
<
n_neurons_concat
:
layer_concat
+=
1
x_concat
=
keras
.
layers
.
Dense
(
n_neurons
,
name
=
f
"
Dense_
{
layer_concat
}
"
)(
x_concat
)
if
self
.
bn
is
True
:
x_concat
=
keras
.
layers
.
BatchNormalization
()(
x_concat
)
x_concat
=
self
.
activation
(
name
=
f
"
{
self
.
activation_name
}
_
{
layer_concat
}
"
)(
x_concat
)
if
self
.
dropout
is
not
None
:
x_concat
=
self
.
dropout
(
self
.
dropout_rate
)(
x_concat
)
x_concat
=
keras
.
layers
.
Dense
(
self
.
_output_shape
)(
x_concat
)
out
=
self
.
activation_output
(
name
=
f
"
{
self
.
activation_output_name
}
_output
"
)(
x_concat
)
self
.
model
=
keras
.
Model
(
inputs
=
x_input
,
outputs
=
[
out
])
print
(
self
.
model
.
summary
())
def
set_compile_options
(
self
):
# self.compile_options = {"loss": [keras.losses.mean_squared_error],
# "metrics": ["mse", "mae", var_loss]}
self
.
compile_options
=
{
"
loss
"
:
[
custom_loss
([
keras
.
losses
.
mean_squared_error
,
var_loss
],
loss_weights
=
[
2
,
1
])],
"
metrics
"
:
[
"
mse
"
,
"
mae
"
,
var_loss
]}
Loading