diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 6c256f86c6289578030bd0f116315fd724eb9b7a..0bdd9b216073bd6d045233afb3fd945718117a98 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -283,21 +283,6 @@ class DataHandlerMixedSamplingWithClimateFirFilter(DataHandlerClimateFirFilter): dh_transformation=dh_transformation[1], **kwargs) return {"filtered": transformation_filtered, "unfiltered": transformation_unfiltered} - def get_X_original(self): - if self.use_filter_branches is True: - X = [] - for data in self._collection: - if hasattr(data, "filter_dim"): - X_total = data.get_X() - filter_dim = data.filter_dim - for filter_name in data.filter_dim_order: - X.append(X_total.sel({filter_dim: filter_name}, drop=True)) - else: - X.append(data.get_X()) - return X - else: - return super().get_X_original() - class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWithClimateFirFilter): # data_handler = DataHandlerMixedSamplingWithClimateFirFilterSingleStation @@ -457,18 +442,3 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi else: # if no unfiltered meteo branch transformation_res["filtered_meteo"] = transformation_meteo return transformation_res if len(transformation_res) > 0 else None - - def get_X_original(self): - if self.use_filter_branches is True: - X = [] - for data in self._collection: - if hasattr(data, "filter_dim"): - X_total = data.get_X() - filter_dim = data.filter_dim - for filter_name in data.filter_dim_order: - X.append(X_total.sel({filter_dim: filter_name}, drop=True)) - else: - X.append(data.get_X()) - return X - else: - return super().get_X_original() diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 997ecbf51740cce159d2339b589728b5e708de53..47ccc5510c8135745c518611504cd02900a1f883 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -116,6 +116,21 @@ class DataHandlerFilter(DefaultDataHandler): self.use_filter_branches = use_filter_branches super().__init__(*args, **kwargs) + def get_X_original(self): + if self.use_filter_branches is True: + X = [] + for data in self._collection: + if hasattr(data, "filter_dim"): + X_total = data.get_X() + filter_dim = data.filter_dim + for filter_name in data.filter_dim_order: + X.append(X_total.sel({filter_dim: filter_name}, drop=True)) + else: + X.append(data.get_X()) + return X + else: + return super().get_X_original() + class DataHandlerFirFilterSingleStation(DataHandlerFilterSingleStation): """Data handler for a single station to be used by a superior data handler. Inputs are FIR filtered.""" diff --git a/mlair/model_modules/branched_input_networks.py b/mlair/model_modules/branched_input_networks.py index 2c62c3cafc1537979e4a21bdb3bb6aa798e6e193..a7841f6aab031647e0a3a6a8af6b7c648179cbc3 100644 --- a/mlair/model_modules/branched_input_networks.py +++ b/mlair/model_modules/branched_input_networks.py @@ -1,11 +1,71 @@ from functools import partial, reduce +import copy from tensorflow import keras as keras from mlair import AbstractModelClass -from mlair.helpers import select_from_dict +from mlair.helpers import select_from_dict, to_list from mlair.model_modules.loss import var_loss from mlair.model_modules.recurrent_networks import RNN +from mlair.model_modules.convolutional_networks import CNNfromConfig + + +class BranchedInputCNN(CNNfromConfig): # pragma: no cover + """A convolutional neural network with multiple input branches.""" + + def __init__(self, input_shape: list, output_shape: list, layer_configuration: list, optimizer="adam", **kwargs): + + super().__init__([input_shape], output_shape, layer_configuration, optimizer=optimizer, **kwargs) + + def set_model(self): + + x_input = [] + x_in = [] + stop_pos = None + + for branch in range(len(self._input_shape)): + print(branch) + shape_b = self._input_shape[branch] + x_input_b = keras.layers.Input(shape=shape_b, name=f"input_branch{branch + 1}") + x_input.append(x_input_b) + x_in_b = x_input_b + b_conf = copy.deepcopy(self.conf) + + for pos, layer_opts in enumerate(b_conf): + print(layer_opts) + if layer_opts.get("type") == "Concatenate": + if stop_pos is None: + stop_pos = pos + else: + assert pos == stop_pos + break + layer, layer_kwargs, follow_up_layer = self._extract_layer_conf(layer_opts) + x_in_b = layer(**layer_kwargs, name=f"{layer.__name__}_branch{branch + 1}_{pos + 1}")(x_in_b) + if follow_up_layer is not None: + for follow_up in to_list(follow_up_layer): + x_in_b = follow_up(name=f"{follow_up.__name__}_branch{branch + 1}_{pos + 1}")(x_in_b) + self._layer_save.append({"layer": layer, **layer_kwargs, "follow_up_layer": follow_up_layer, + "branch": branch}) + x_in.append(x_in_b) + + print("concat") + x_concat = keras.layers.Concatenate()(x_in) + + if stop_pos is not None: + for pos, layer_opts in enumerate(self.conf[stop_pos + 1:]): + print(layer_opts) + layer, layer_kwargs, follow_up_layer = self._extract_layer_conf(layer_opts) + x_concat = layer(**layer_kwargs, name=f"{layer.__name__}_{pos + stop_pos + 1}")(x_concat) + if follow_up_layer is not None: + for follow_up in to_list(follow_up_layer): + x_concat = follow_up(name=f"{follow_up.__name__}_{pos + stop_pos + 1}")(x_concat) + self._layer_save.append({"layer": layer, **layer_kwargs, "follow_up_layer": follow_up_layer, + "branch": "concat"}) + + x_concat = keras.layers.Dense(self._output_shape)(x_concat) + out = self.activation_output(name=f"{self.activation_output_name}_output")(x_concat) + self.model = keras.Model(inputs=x_input, outputs=[out]) + print(self.model.summary()) class BranchedInputRNN(RNN): # pragma: no cover @@ -15,11 +75,6 @@ class BranchedInputRNN(RNN): # pragma: no cover super().__init__([input_shape], output_shape, *args, **kwargs) - # apply to model - # self.set_model() - # self.set_compile_options() - # self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) - def set_model(self): """ Build the model. diff --git a/mlair/model_modules/convolutional_networks.py b/mlair/model_modules/convolutional_networks.py index d8eb6eb3403db13932a51274fdc1f563dbfb6ef3..3e87bb7c595e4a0f709ca73449043ad74e9b6e9d 100644 --- a/mlair/model_modules/convolutional_networks.py +++ b/mlair/model_modules/convolutional_networks.py @@ -4,20 +4,163 @@ __date__ = '2021-02-' from functools import reduce, partial from mlair.model_modules import AbstractModelClass -from mlair.helpers import select_from_dict +from mlair.helpers import select_from_dict, to_list from mlair.model_modules.loss import var_loss, custom_loss from mlair.model_modules.advanced_paddings import PadUtils, Padding2D, SymmetricPadding2D import tensorflow.keras as keras +class CNNfromConfig(AbstractModelClass): + _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), + "sigmoid": partial(keras.layers.Activation, "sigmoid"), + "linear": partial(keras.layers.Activation, "linear"), + "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), + "leakyrelu": partial(keras.layers.LeakyReLU)} + _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", + "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), + "prelu": keras.initializers.he_normal()} + _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD} + _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} + _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] + + """ + Use this class like the following. Note that all keys must match the corresponding tf/keras keys of the layer + + ```python + input_shape = [(65,1,9)] + output_shape = [(4, )] + layer_configuration=[ + {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 8}, + {"type": "Dropout", "rate": 0.2}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 16}, + {"type": "Dropout", "rate": 0.2}, + {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 16}, + {"type": "Dropout", "rate": 0.2}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 32}, + {"type": "Dropout", "rate": 0.2}, + {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 32}, + {"type": "Dropout", "rate": 0.2}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 64}, + {"type": "Dropout", "rate": 0.2}, + {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, + {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 64}, + {"type": "Dropout", "rate": 0.2}, + {"type": "Flatten"}, + # {"type": "Dense", "units": 128, "activation": "relu"} + ] + model = CNNfromConfig(input_shape, output_shape, layer_configuration) + ``` + + """ + + def __init__(self, input_shape: list, output_shape: list, layer_configuration: list, optimizer="adam", + batch_normalization=False, **kwargs): + + assert len(input_shape) == 1 + assert len(output_shape) == 1 + super().__init__(input_shape[0], output_shape[0]) + + self.conf = layer_configuration + activation_output = kwargs.pop("activation_output", "linear") + self.activation_output = self._activation.get(activation_output) + self.activation_output_name = activation_output + self.kwargs = kwargs + self.bn = batch_normalization + self.optimizer = self._set_optimizer(optimizer, **kwargs) + self._layer_save = [] + + # apply to model + self.set_model() + self.set_compile_options() + self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss) + + def set_model(self): + x_input = keras.layers.Input(shape=self._input_shape) + x_in = x_input + + for layer_opts in self.conf: + print(layer_opts) + layer, layer_kwargs, follow_up_layer = self._extract_layer_conf(layer_opts) + x_in = layer(**layer_kwargs)(x_in) + if follow_up_layer is not None: + for follow_up in to_list(follow_up_layer): + x_in = follow_up()(x_in) + self._layer_save.append({"layer": layer, **layer_kwargs, "follow_up_layer": follow_up_layer}) + + x_in = keras.layers.Dense(self._output_shape)(x_in) + out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) + self.model = keras.Model(inputs=x_input, outputs=[out]) + print(self.model.summary()) + + def _set_optimizer(self, optimizer, **kwargs): + try: + opt_name = optimizer.lower() + opt = self._optimizer.get(opt_name) + opt_kwargs = {} + if opt_name == "adam": + opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"]) + elif opt_name == "sgd": + opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"]) + return opt(**opt_kwargs) + except KeyError: + raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.") + + def _set_regularizer(self, regularizer, **kwargs): + if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"): + return None + try: + reg_name = regularizer.lower() + reg = self._regularizer.get(reg_name) + reg_kwargs = {} + if reg_name in ["l1", "l2"]: + reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True) + if reg_name in reg_kwargs: + reg_kwargs["l"] = reg_kwargs.pop(reg_name) + elif reg_name == "l1_l2": + reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True) + return reg(**reg_kwargs) + except KeyError: + raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") + + def set_compile_options(self): + # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], + # "metrics": ["mse", "mae", var_loss]} + self.compile_options = {"loss": [keras.losses.mean_squared_error], + "metrics": ["mse", "mae", var_loss]} + + def _extract_layer_conf(self, layer_opts): + follow_up_layer = None + layer_type = layer_opts.pop("type") + layer = getattr(keras.layers, layer_type, None) + activation_type = layer_opts.pop("activation", None) + if activation_type is not None: + activation = self._activation.get(activation_type) + kernel_initializer = self._initializer.get(activation_type, "glorot_uniform") + layer_opts["kernel_initializer"] = kernel_initializer + follow_up_layer = activation + if self.bn is True: + another_layer = keras.layers.BatchNormalization + if activation_type in ["relu", "linear", "prelu", "leakyrelu"]: + follow_up_layer = (another_layer, follow_up_layer) + else: + follow_up_layer = (follow_up_layer, another_layer) + regularizer_type = layer_opts.pop("kernel_regularizer", None) + if regularizer_type is not None: + layer_opts["kernel_regularizer"] = self._set_regularizer(regularizer_type, **self.kwargs) + return layer, layer_opts, follow_up_layer + + class CNN(AbstractModelClass): # pragma: no cover _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), "sigmoid": partial(keras.layers.Activation, "sigmoid"), "linear": partial(keras.layers.Activation, "linear"), "selu": partial(keras.layers.Activation, "selu"), - "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25))} + "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), + "leakyrelu": partial(keras.layers.LeakyReLU)} _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), "prelu": keras.initializers.he_normal()} @@ -25,9 +168,67 @@ class CNN(AbstractModelClass): # pragma: no cover _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] _dropout = {"selu": keras.layers.AlphaDropout} + _pooling = {"max": keras.layers.MaxPooling2D, "average": keras.layers.AveragePooling2D, + "mean": keras.layers.AveragePooling2D} + + """ + Define CNN model as in the following examples: + + * use same kernel for all layers and use in total 3 conv layers, no dropout or pooling is applied + + ```python + model=CNN, + kernel_size=5, + n_layer=3, + dense_layer_configuration=[128, 64], + ``` + + * specify the kernel sizes, make sure len of kernel size parameter matches number of layers + + ```python + model=CNN, + kernel_size=[3, 7, 11], + n_layer=3, + dense_layer_configuration=[128, 64], + ``` + + * use different number of filters in each layer (can be combined either with fixed or individual kernel sizes), + make sure that lengths match. Using layer_configuration always overwrites any value given to n_layers parameter. + + ```python + model=CNN, + kernel_size=[3, 7, 11], + layer_configuration=[24, 48, 48], + ``` + + * now specify individual kernel sizes and number of filters for each layer + + ```python + model=CNN, + layer_configuration=[(16, 3), (32, 7), (64, 11)], + dense_layer_configuration=[128, 64], + ``` + + * add also some dropout and pooling every 2nd layer, dropout is applied after the conv layer, pooling before. Note + that pooling will not used in the init layer whereas dropout is already applied there. + + ```python + model=CNN, + dropout_freq=2, + dropout=0.3, + pooling_type="max", + pooling_freq=2, + pooling_size=3, + layer_configuration=[(16, 3), (32, 7), (64, 11)], + dense_layer_configuration=[128, 64], + ``` + """ def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear", - optimizer="adam", regularizer=None, kernel_size=1, dropout=None, **kwargs): + optimizer="adam", regularizer=None, kernel_size=7, dropout=None, dropout_freq=None, pooling_freq=None, + pooling_type="max", + n_layer=1, n_filter=10, layer_configuration=None, pooling_size=None, + dense_layer_configuration=None, **kwargs): assert len(input_shape) == 1 assert len(output_shape) == 1 @@ -42,12 +243,31 @@ class CNN(AbstractModelClass): # pragma: no cover self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs) self.kernel_size = kernel_size self.optimizer = self._set_optimizer(optimizer, **kwargs) + self.layer_configuration = (n_layer, n_filter, self.kernel_size) if layer_configuration is None else layer_configuration + self.dense_layer_configuration = dense_layer_configuration or [] + self.pooling = self._set_pooling(pooling_type) + self.pooling_size = pooling_size self.dropout, self.dropout_rate = self._set_dropout(activation, dropout) + self.dropout_freq = self._set_layer_freq(dropout_freq) + self.pooling_freq = self._set_layer_freq(pooling_freq) # apply to model self.set_model() self.set_compile_options() - self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss) + # self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss) + self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) + + def _set_pooling(self, pooling): + try: + return self._pooling.get(pooling.lower()) + except KeyError: + raise AttributeError(f"Given pooling {pooling} is not supported in this model class.") + + def _set_layer_freq(self, param): + param = 0 if param is None else param + assert 0 <= param + assert isinstance(param, int) + return param def _set_activation(self, activation): try: @@ -91,6 +311,67 @@ class CNN(AbstractModelClass): # pragma: no cover assert 0 <= dropout_rate < 1 return self._dropout.get(activation, keras.layers.Dropout), dropout_rate + def set_model(self): + """ + Build the model. + """ + if isinstance(self.layer_configuration, tuple) is True: + n_layer, n_hidden, kernel_size = self.layer_configuration + if isinstance(kernel_size, list): + assert len(kernel_size) == n_layer # use individual filter sizes for each layer + conf = [(n_hidden, kernel_size[i]) for i in range(n_layer)] + else: + assert isinstance(kernel_size, int) # use same filter size for all layers + conf = [(n_hidden, kernel_size) for _ in range(n_layer)] + else: + assert isinstance(self.layer_configuration, list) is True + if not isinstance(self.layer_configuration[0], tuple): + if isinstance(self.kernel_size, list): + assert len(self.kernel_size) == len(self.layer_configuration) # use individual filter sizes for each layer + conf = [(n_filter, self.kernel_size[i]) for i, n_filter in enumerate(self.layer_configuration)] + else: + assert isinstance(self.kernel_size, int) # use same filter size for all layers + conf = [(n_filter, self.kernel_size) for n_filter in self.layer_configuration] + else: + assert len(self.layer_configuration[0]) == 2 + conf = self.layer_configuration + + x_input = keras.layers.Input(shape=self._input_shape) + x_in = x_input + for layer, (n_filter, kernel_size) in enumerate(conf): + if self.pooling_size is not None and self.pooling_freq > 0 and layer % self.pooling_freq == 0 and layer > 0: + x_in = self.pooling((self.pooling_size, 1), strides=(1, 1), padding='valid')(x_in) + x_in = keras.layers.Conv2D(filters=n_filter, kernel_size=(kernel_size, 1), + kernel_initializer=self.kernel_initializer, + kernel_regularizer=self.kernel_regularizer)(x_in) + x_in = self.activation()(x_in) + if self.dropout is not None and self.dropout_freq > 0 and layer % self.dropout_freq == 0: + x_in = self.dropout(self.dropout_rate)(x_in) + + x_in = keras.layers.Flatten()(x_in) + for layer, n_hidden in enumerate(self.dense_layer_configuration): + if n_hidden < self._output_shape: + break + x_in = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}", + kernel_initializer=self.kernel_initializer, )(x_in) + x_in = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_in) + if self.dropout is not None: + x_in = self.dropout(self.dropout_rate)(x_in) + + x_in = keras.layers.Dense(self._output_shape)(x_in) + out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) + self.model = keras.Model(inputs=x_input, outputs=[out]) + print(self.model.summary()) + + def set_compile_options(self): + # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], + # "metrics": ["mse", "mae", var_loss]} + self.compile_options = {"loss": [keras.losses.mean_squared_error], + "metrics": ["mse", "mae", var_loss]} + + +class CNN_16_32_64(CNN): + def set_model(self): """ Build the model. @@ -123,7 +404,3 @@ class CNN(AbstractModelClass): # pragma: no cover x_in = keras.layers.Dense(self._output_shape)(x_in) out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) self.model = keras.Model(inputs=x_input, outputs=[out]) - - def set_compile_options(self): - self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], - "metrics": ["mse", "mae", var_loss]} diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index aca5f583bff181281aee9104df6750da61ad4f93..df797ffc23370bf4f45bb2b4f76e5f71e9bd030f 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -389,7 +389,7 @@ class ExperimentSetup(RunEnvironment): self._set_param("neighbors", ["DEBW030"]) # TODO: just for testing # set competitors - if model_display_name is not None and model_display_name in competitors: + if model_display_name is not None and competitors is not None and model_display_name in competitors: raise IndexError(f"Given model_display_name {model_display_name} is also present in the competitors " f"variable {competitors}. To assure a proper workflow it is required to have unique names " f"for each model and competitor. Please use a different model display name or competitor.") diff --git a/mlair/run_modules/model_setup.py b/mlair/run_modules/model_setup.py index 98263eb732d8067fba0950c7a4882fb3ef020995..4e9f8fa4439e9885a6c16c2b2eccfee2c97fd936 100644 --- a/mlair/run_modules/model_setup.py +++ b/mlair/run_modules/model_setup.py @@ -172,6 +172,7 @@ class ModelSetup(RunEnvironment): def report_model(self): # report model settings + _f = self._clean_name model_settings = self.model.get_settings() model_settings.update(self.model.compile_options) model_settings.update(self.model.optimizer.get_config()) @@ -180,9 +181,12 @@ class ModelSetup(RunEnvironment): if v is None: continue if isinstance(v, list): - v = ",".join(self._clean_name(str(u)) for u in v) + if isinstance(v[0], dict): + v = ["{" + vi + "}" for vi in [",".join(f"{_f(str(uk))}:{_f(str(uv))}" for uk, uv in d.items()) for d in v]] + else: + v = ",".join(_f(str(u)) for u in v) if "<" in str(v): - v = self._clean_name(str(v)) + v = _f(str(v)) df.loc[k] = str(v) df.loc["count params"] = str(self.model.count_params()) df.sort_index(inplace=True) @@ -202,5 +206,8 @@ class ModelSetup(RunEnvironment): @staticmethod def _clean_name(orig_name: str): mod_name = re.sub(r'^{0}'.format(re.escape("<")), '', orig_name).replace("'", "").split(" ") - mod_name = mod_name[1] if any(map(lambda x: x in mod_name[0], ["class", "function", "method"])) else mod_name[0] - return mod_name[:-1] if mod_name[-1] == ">" else mod_name + mod_name = mod_name[1] if any(map(lambda x: x in mod_name[0], ["class", "function", "method"])) else mod_name + mod_name = mod_name[0].split(".")[-1] if any( + map(lambda x: x in mod_name[0], ["tensorflow", "keras"])) else mod_name + mod_name = mod_name[:-1] if mod_name[-1] == ">" else "".join(mod_name) + return mod_name.split(".")[-1] if any(map(lambda x: x in mod_name, ["tensorflow", "keras"])) else mod_name diff --git a/test/test_run_modules/test_model_setup.py b/test/test_run_modules/test_model_setup.py index 7cefd0e58f5b9b0787bafddffe1ad07e4851a068..60b37207ceefc4088b33fa002dac9db7c6c35399 100644 --- a/test/test_run_modules/test_model_setup.py +++ b/test/test_run_modules/test_model_setup.py @@ -126,6 +126,14 @@ class TestModelSetup: def test_init(self): pass + def test_clean_name(self, setup): + in_str = "<tensorflow.python.keras.initializers.initializers_v2.HeNormal object at 0x7fecfa0da9b0>" + assert setup._clean_name(in_str) == "HeNormal" + in_str = "<class 'tensorflow.python.keras.layers.convolutional.Conv2D'>" + assert setup._clean_name(in_str) == "Conv2D" + in_str = "default" + assert setup._clean_name(in_str) == "default" + class DummyData: @@ -141,4 +149,4 @@ class DummyData: def get_Y(self, upsampling=False, as_numpy=True): Y1 = np.random.randint(0, 10, size=(self.number_of_samples, 5)) # samples, window Y2 = np.random.randint(21, 30, size=(self.number_of_samples, 3)) # samples, window - return [Y1, Y2] \ No newline at end of file + return [Y1, Y2]