import logging import os import platform import keras import mock import numpy as np import pytest from src.helpers import * class TestToList: def test_to_list(self): assert to_list('a') == ['a'] assert to_list('abcd') == ['abcd'] assert to_list([1, 2, 3]) == [1, 2, 3] assert to_list([45]) == [45] class TestCheckPath: def test_check_path_and_create(self, caplog): caplog.set_level(logging.DEBUG) path = 'data/test' assert not os.path.exists('data/test') check_path_and_create(path) assert os.path.exists('data/test') assert caplog.messages[0] == "Created path: data/test" check_path_and_create(path) assert caplog.messages[1] == "Path already exists: data/test" os.rmdir('data/test') class TestLoss: def test_l_p_loss(self): model = keras.Sequential() model.add(keras.layers.Lambda(lambda x: x, input_shape=(None,))) model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2)) hist = model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1) assert hist.history['loss'][0] == 1.25 model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(3)) hist = model.fit(np.array([1, 0, -2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1) assert hist.history['loss'][0] == 2.25 class TestTimeTracking: def test_init(self): t = TimeTracking() assert t.start is not None assert t.start < time.time() assert t.end is None t2 = TimeTracking(start=False) assert t2.start is None def test__start(self): t = TimeTracking(start=False) t._start() assert t.start < time.time() def test__end(self): t = TimeTracking() t._end() assert t.end > t.start def test__duration(self): t = TimeTracking() d1 = t._duration() assert d1 > 0 d2 = t._duration() assert d2 > d1 t._end() d3 = t._duration() assert d3 > d2 assert d3 == t._duration() def test_repr(self): t = TimeTracking() t._end() duration = t._duration() assert t.__repr__().rstrip() == f"{dt.timedelta(seconds=math.ceil(duration))} (hh:mm:ss)".rstrip() def test_run(self): t = TimeTracking(start=False) assert t.start is None t.run() assert t.start is not None def test_stop(self): t = TimeTracking() assert t.end is None duration = t.stop(get_duration=True) assert duration == t._duration() with pytest.raises(AssertionError) as e: t.stop() assert "Time was already stopped" in e.value.args[0] t.run() assert t.end is None assert t.stop() is None assert t.end is not None def test_duration(self): t = TimeTracking() duration = t assert duration is not None duration = t.stop(get_duration=True) assert duration == t.duration() def test_enter_exit(self, caplog): caplog.set_level(logging.INFO) with TimeTracking() as t: assert t.start is not None assert t.end is None expression = PyTestRegex(r"undefined job finished after \d+:\d+:\d+ \(hh:mm:ss\)") assert caplog.record_tuples[-1] == ('root', 20, expression) class TestPrepareHost: @mock.patch("socket.gethostname", side_effect=["linux-aa9b", "ZAM144", "zam347", "jrtest", "jwtest"]) @mock.patch("os.getlogin", return_value="testUser") @mock.patch("os.path.exists", return_value=True) def test_prepare_host(self, mock_host, mock_user, mock_path): path = prepare_host() assert path == "/home/testUser/machinelearningtools/data/toar_daily/" path = prepare_host() assert path == "/home/testUser/Data/toar_daily/" path = prepare_host() assert path == "/home/testUser/Data/toar_daily/" path = prepare_host() assert path == "/p/project/cjjsc42/testUser/DATA/toar_daily/" path = prepare_host() assert path == "/p/home/jusers/testUser/juwels/intelliaq/DATA/toar_daily/" @mock.patch("socket.gethostname", return_value="NotExistingHostName") @mock.patch("os.getlogin", return_value="zombie21") def test_error_handling(self, mock_user, mock_host): with pytest.raises(OSError) as e: prepare_host() assert "unknown host 'NotExistingHostName'" in e.value.args[0] if "runner-6HmDp9Qd-project-2411-concurrent" not in platform.node(): mock_host.return_value = "linux-aa9b" with pytest.raises(NotADirectoryError) as e: prepare_host() assert "does not exist for host 'linux-aa9b'" in e.value.args[0] class TestSetExperimentName: def test_set_experiment(self): exp_name, exp_path = set_experiment_name() assert exp_name == "TestExperiment" assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "TestExperiment")) exp_name, exp_path = set_experiment_name(experiment_date="2019-11-14", experiment_path="./test2") assert exp_name == "2019-11-14_network" assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test2", exp_name)) def test_set_experiment_from_sys(self): exp_name, _ = set_experiment_name(experiment_date="2019-11-14") assert exp_name == "2019-11-14_network" class TestPytestRegex: @pytest.fixture def regex(self): return PyTestRegex("teststring") def test_pytest_regex_init(self, regex): assert regex._regex.pattern == "teststring" def test_pytest_regex_eq(self, regex): assert regex == "teststringabcd" assert regex != "teststgabcd" def test_pytest_regex_repr(self, regex): assert regex.__repr__() == "teststring" class TestDictToXarray: def test_dict_to_xarray(self): array1 = xr.DataArray(np.random.randn(2, 3), dims=('x', 'y'), coords={'x': [10, 20]}) array2 = xr.DataArray(np.random.randn(2, 3), dims=('x', 'y'), coords={'x': [10, 20]}) d = {"number1": array1, "number2": array2} res = dict_to_xarray(d, "merge_dim") assert type(res) == xr.DataArray assert sorted(list(res.coords)) == ["merge_dim", "x"] assert res.shape == (2, 2, 3) class TestFloatRound: def test_float_round_ceil(self): assert float_round(4.6) == 5 assert float_round(239.3992) == 240 def test_float_round_decimals(self): assert float_round(23.0091, 2) == 23.01 assert float_round(23.1091, 3) == 23.11 def test_float_round_type(self): assert float_round(34.9221, 2, math.floor) == 34.92 assert float_round(34.9221, 0, math.floor) == 34. assert float_round(34.9221, 2, round) == 34.92 assert float_round(34.9221, 0, round) == 35. def test_float_round_negative(self): assert float_round(-34.9221, 2, math.floor) == -34.93 assert float_round(-34.9221, 0, math.floor) == -35. assert float_round(-34.9221, 2) == -34.92 assert float_round(-34.9221, 0) == -34.