diff --git a/src/helpers.py b/src/helpers.py index 424b2fb519726adde9d8d30fb610379f9b4dfed3..342a0b5ef77d8b286291792aa007f59c8c7b09b2 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -2,7 +2,76 @@ __author__ = 'Lukas Leufen' __date__ = '2019-10-21' +import logging +import keras +import keras.backend as K +import math +from typing import Union +import numpy as np + + def to_list(arg): if not isinstance(arg, list): arg = [arg] return arg + + +def l_p_loss(power: int): + """ + Calculate the L<p> loss for given power p. L1 (p=1) is equal to mean absolute error (MAE), L2 (p=2) is to mean + squared error (MSE), ... + :param power: set the power of the error calculus + :return: loss for given power + """ + def loss(y_true, y_pred): + return K.mean(K.pow(K.abs(y_pred - y_true), power), axis=-1) + return loss + + +class LearningRateDecay(keras.callbacks.History): + """ + Decay learning rate during model training. Start with a base learning rate and lower this rate after every + n(=epochs_drop) epochs by drop value (0, 1], drop value = 1 means no decay in learning rate. + """ + + def __init__(self, base_lr: float = 0.01, drop: float = 0.96, epochs_drop: int = 8): + super().__init__() + self.lr = {'lr': []} + self.base_lr = self.check_param(base_lr, 'base_lr') + self.drop = self.check_param(drop, 'drop') + self.epochs_drop = self.check_param(epochs_drop, 'epochs_drop', upper=None) + + @staticmethod + def check_param(value: float, name: str, lower: Union[float, None] = 0, upper: Union[float, None] = 1): + """ + Check if given value is in interval. The left (lower) endpoint is open, right (upper) endpoint is closed. To + only one side of the interval, set the other endpoint to None. If both ends are set to None, just return the + value without any check. + :param value: value to check + :param name: name of the variable to display in error message + :param lower: left (lower) endpoint of interval, opened + :param upper: right (upper) endpoint of interval, closed + :return: unchanged value or raise ValueError + """ + if lower is None: + lower = -np.inf + if upper is None: + upper = np.inf + if lower < value <= upper: + return value + else: + raise ValueError(f"{name} is out of allowed range ({lower}, {upper}{')' if upper == np.inf else ']'}: " + f"{name}={value}") + + def on_epoch_begin(self, epoch: int, logs=None): + """ + Lower learning rate every epochs_drop epochs by factor drop. + :param epoch: current epoch + :param logs: ? + :return: update keras learning rate + """ + current_lr = self.base_lr * math.pow(self.drop, math.floor(epoch / self.epochs_drop)) + K.set_value(self.model.optimizer.lr, current_lr) + self.lr['lr'].append(current_lr) + logging.info(f"Set learning rate to {current_lr}") + return K.get_value(self.model.optimizer.lr) diff --git a/test/test_helpers.py b/test/test_helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..e4a23d15b6ef497849af28ffd783b6f51c6c5b5d --- /dev/null +++ b/test/test_helpers.py @@ -0,0 +1,57 @@ +import pytest +from src.helpers import l_p_loss, LearningRateDecay +import logging +import os +import keras +import numpy as np + + +class TestLoss: + + def test_l_p_loss(self): + model = keras.Sequential() + model.add(keras.layers.Lambda(lambda x: x, input_shape=(None, ))) + model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) + hist = model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1) + assert hist.history['loss'][0] == 1.25 + model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(3)) + hist = model.fit(np.array([1, 0, -2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1) + assert hist.history['loss'][0] == 2.25 + + +class TestLearningRateDecay: + + def test_init(self): + lr_decay = LearningRateDecay() + assert lr_decay.lr == {'lr': []} + assert lr_decay.base_lr == 0.01 + assert lr_decay.drop == 0.96 + assert lr_decay.epochs_drop == 8 + + def test_check_param(self): + lr_decay = object.__new__(LearningRateDecay) + assert lr_decay.check_param(1, "tester") == 1 + assert lr_decay.check_param(0.5, "tester") == 0.5 + with pytest.raises(ValueError) as e: + lr_decay.check_param(0, "tester") + assert "tester is out of allowed range (0, 1]: tester=0" in e.value.args[0] + with pytest.raises(ValueError) as e: + lr_decay.check_param(1.5, "tester") + assert "tester is out of allowed range (0, 1]: tester=1.5" in e.value.args[0] + assert lr_decay.check_param(1.5, "tester", upper=None) == 1.5 + with pytest.raises(ValueError) as e: + lr_decay.check_param(0, "tester", upper=None) + assert "tester is out of allowed range (0, inf): tester=0" in e.value.args[0] + assert lr_decay.check_param(0.5, "tester", lower=None) == 0.5 + with pytest.raises(ValueError) as e: + lr_decay.check_param(0.5, "tester", lower=None, upper=0.2) + assert "tester is out of allowed range (-inf, 0.2]: tester=0.5" in e.value.args[0] + assert lr_decay.check_param(10, "tester", upper=None, lower=None) + + def test_on_epoch_begin(self): + lr_decay = LearningRateDecay(base_lr=0.02, drop=0.95, epochs_drop=2) + model = keras.Sequential() + model.add(keras.layers.Dense(1, input_dim=1)) + model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) + model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=5, callbacks=[lr_decay]) + assert lr_decay.lr['lr'] == [0.02, 0.02, 0.02*0.95, 0.02*0.95, 0.02*0.95*0.95]