import logging
import os
import platform
import keras
import mock
import numpy as np
import pytest
from src.helpers import *
class TestToList:
def test_to_list(self):
assert to_list('a') == ['a']
assert to_list('abcd') == ['abcd']
assert to_list([1, 2, 3]) == [1, 2, 3]
assert to_list([45]) == [45]
class TestCheckPath:
def test_check_path_and_create(self, caplog):
caplog.set_level(logging.DEBUG)
path = 'data/test'
assert not os.path.exists('data/test')
check_path_and_create(path)
assert os.path.exists('data/test')
assert caplog.messages[0] == "Created path: data/test"
check_path_and_create(path)
assert caplog.messages[1] == "Path already exists: data/test"
os.rmdir('data/test')
class TestLoss:
def test_l_p_loss(self):
model = keras.Sequential()
model.add(keras.layers.Lambda(lambda x: x, input_shape=(None,)))
model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(2))
hist = model.fit(np.array([1, 0, 2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1)
assert hist.history['loss'][0] == 1.25
model.compile(optimizer=keras.optimizers.Adam(), loss=l_p_loss(3))
hist = model.fit(np.array([1, 0, -2, 0.5]), np.array([1, 1, 0, 0.5]), epochs=1)
assert hist.history['loss'][0] == 2.25
class TestTimeTracking:
def test_init(self):
t = TimeTracking()
assert t.start is not None
assert t.start < time.time()
assert t.end is None
t2 = TimeTracking(start=False)
assert t2.start is None
def test__start(self):
t = TimeTracking(start=False)
t._start()
assert t.start < time.time()
def test__end(self):
t = TimeTracking()
t._end()
assert t.end > t.start
def test__duration(self):
t = TimeTracking()
d1 = t._duration()
assert d1 > 0
d2 = t._duration()
assert d2 > d1
t._end()
d3 = t._duration()
assert d3 > d2
assert d3 == t._duration()
def test_repr(self):
t = TimeTracking()
t._end()
duration = t._duration()
assert t.__repr__().rstrip() == f"{dt.timedelta(seconds=math.ceil(duration))} (hh:mm:ss)".rstrip()
def test_run(self):
t = TimeTracking(start=False)
assert t.start is None
t.run()
assert t.start is not None
def test_stop(self):
t = TimeTracking()
assert t.end is None
duration = t.stop(get_duration=True)
assert duration == t._duration()
with pytest.raises(AssertionError) as e:
t.stop()
assert "Time was already stopped" in e.value.args[0]
t.run()
assert t.end is None
assert t.stop() is None
assert t.end is not None
def test_duration(self):
t = TimeTracking()
duration = t
assert duration is not None
duration = t.stop(get_duration=True)
assert duration == t.duration()
def test_enter_exit(self, caplog):
caplog.set_level(logging.INFO)
with TimeTracking() as t:
assert t.start is not None
assert t.end is None
expression = PyTestRegex(r"undefined job finished after \d+:\d+:\d+ \(hh:mm:ss\)")
assert caplog.record_tuples[-1] == ('root', 20, expression)
class TestPrepareHost:
@mock.patch("socket.gethostname", side_effect=["linux-aa9b", "ZAM144", "zam347", "jrtest", "jwtest"])
@mock.patch("os.getlogin", return_value="testUser")
@mock.patch("os.path.exists", return_value=True)
def test_prepare_host(self, mock_host, mock_user, mock_path):
path = prepare_host()
assert path == "/home/testUser/machinelearningtools/data/toar_daily/"
path = prepare_host()
assert path == "/home/testUser/Data/toar_daily/"
path = prepare_host()
assert path == "/home/testUser/Data/toar_daily/"
path = prepare_host()
assert path == "/p/project/cjjsc42/testUser/DATA/toar_daily/"
path = prepare_host()
assert path == "/p/home/jusers/testUser/juwels/intelliaq/DATA/toar_daily/"
@mock.patch("socket.gethostname", return_value="NotExistingHostName")
@mock.patch("os.getlogin", return_value="zombie21")
def test_error_handling(self, mock_user, mock_host):
with pytest.raises(OSError) as e:
prepare_host()
assert "unknown host 'NotExistingHostName'" in e.value.args[0]
if "runner-6HmDp9Qd-project-2411-concurrent" not in platform.node():
mock_host.return_value = "linux-aa9b"
with pytest.raises(NotADirectoryError) as e:
prepare_host()
assert "does not exist for host 'linux-aa9b'" in e.value.args[0]
class TestSetExperimentName:
def test_set_experiment(self):
exp_name, exp_path = set_experiment_name()
assert exp_name == "TestExperiment"
assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "TestExperiment"))
exp_name, exp_path = set_experiment_name(experiment_date="2019-11-14", experiment_path="./test2")
assert exp_name == "2019-11-14_network"
assert exp_path == os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test2", exp_name))
def test_set_experiment_from_sys(self):
exp_name, _ = set_experiment_name(experiment_date="2019-11-14")
assert exp_name == "2019-11-14_network"
class TestPytestRegex:
@pytest.fixture
def regex(self):
return PyTestRegex("teststring")
def test_pytest_regex_init(self, regex):
assert regex._regex.pattern == "teststring"
def test_pytest_regex_eq(self, regex):
assert regex == "teststringabcd"
assert regex != "teststgabcd"
def test_pytest_regex_repr(self, regex):
assert regex.__repr__() == "teststring"
class TestDictToXarray:
def test_dict_to_xarray(self):
array1 = xr.DataArray(np.random.randn(2, 3), dims=('x', 'y'), coords={'x': [10, 20]})
array2 = xr.DataArray(np.random.randn(2, 3), dims=('x', 'y'), coords={'x': [10, 20]})
d = {"number1": array1, "number2": array2}
res = dict_to_xarray(d, "merge_dim")
assert type(res) == xr.DataArray
assert sorted(list(res.coords)) == ["merge_dim", "x"]
assert res.shape == (2, 2, 3)
class TestFloatRound:
def test_float_round_ceil(self):
assert float_round(4.6) == 5
assert float_round(239.3992) == 240
def test_float_round_decimals(self):
assert float_round(23.0091, 2) == 23.01
assert float_round(23.1091, 3) == 23.11
def test_float_round_type(self):
assert float_round(34.9221, 2, math.floor) == 34.92
assert float_round(34.9221, 0, math.floor) == 34.
assert float_round(34.9221, 2, round) == 34.92
assert float_round(34.9221, 0, round) == 35.
def test_float_round_negative(self):
assert float_round(-34.9221, 2, math.floor) == -34.93
assert float_round(-34.9221, 0, math.floor) == -35.
assert float_round(-34.9221, 2) == -34.92
assert float_round(-34.9221, 0) == -34.