import datetime as dt import logging import os from operator import itemgetter, lt, gt import numpy as np import pandas as pd import pytest import xarray as xr from src.data_handler.data_preparation import AbstractDataPrep from src.data_handler import DataPrepJoin as DataPrep from src.helpers.join import EmptyQueryResult class TestAbstractDataPrep: @pytest.fixture def data_prep_no_init(self): d = object.__new__(AbstractDataPrep) d.path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') d.station = ['DEBW107'] d.variables = ['o3', 'temp'] d.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} d.sampling = "daily" d.kwargs = {} return d @pytest.fixture def data(self): return DataPrep(os.path.join(os.path.dirname(__file__), 'data'), 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}, network="AIRBASE").data @pytest.fixture def data_prep(self, data_prep_no_init, data): data_prep_no_init.mean = None data_prep_no_init.std = None data_prep_no_init.history = None data_prep_no_init.label = None data_prep_no_init.observation = None data_prep_no_init.extremes_history = None data_prep_no_init.extremes_label = None data_prep_no_init.data = None data_prep_no_init.meta = None data_prep_no_init._transform_method = None data_prep_no_init.data = data return data_prep_no_init def test_all_placeholders(self, data_prep_no_init): with pytest.raises(NotImplementedError): data_prep_no_init.download_data("a", "b") with pytest.raises(NotImplementedError): data_prep_no_init.check_station_meta() def test_set_file_name_and_meta(self): d = object.__new__(AbstractDataPrep) d.path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") d.station = 'TESTSTATION' d.variables = ['a', 'bc'] d.statistics_per_var = {'a': 'dma8eu', 'bc': 'maximum'} assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), "data/TESTSTATION_a_bc.nc") assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), "data/TESTSTATION_a_bc_meta.csv") @pytest.mark.parametrize('opts', [{'dim': 'datetime', 'method': 'nearest', 'limit': 10, 'use_coordinate': True}, {'dim': 'datetime', 'limit': 5}, {'dim': 'datetime'}]) def test_interpolate(self, data_prep, opts): data_org = data_prep.data data_prep.interpolate(**opts) # set default params if empty opts["method"] = opts.get("method", 'linear') opts["limit"] = opts.get("limit", None) opts["use_coordinate"] = opts.get("use_coordinate", True) assert xr.testing.assert_equal(data_org.interpolate_na(**opts), data_prep.data) is None def test_transform_standardise(self, data_prep): assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None data_prep.transform('datetime') assert data_prep._transform_method == 'standardise' assert np.testing.assert_almost_equal(data_prep.data.mean('datetime').variable.values, np.array([[0, 0]])) is None assert np.testing.assert_almost_equal(data_prep.data.std('datetime').variable.values, np.array([[1, 1]])) is None assert isinstance(data_prep.mean, xr.DataArray) assert isinstance(data_prep.std, xr.DataArray) def test_transform_standardise_apply(self, data_prep): assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None data_mean_orig = data_prep.data.mean('datetime').variable.values data_std_orig = data_prep.data.std('datetime').variable.values mean_external = np.array([20, 12]) std_external = np.array([15, 5]) mean = xr.DataArray(mean_external, coords={"variables": ['o3', 'temp']}, dims=["variables"]) std = xr.DataArray(std_external, coords={"variables": ['o3', 'temp']}, dims=["variables"]) data_prep.transform('datetime', mean=mean, std=std) assert all(data_prep.mean.values == mean_external) assert all(data_prep.std.values == std_external) data_mean_transformed = data_prep.data.mean('datetime').variable.values data_std_transformed = data_prep.data.std('datetime').variable.values data_mean_expected = (data_mean_orig - mean_external) / std_external # mean scales as any other data data_std_expected = data_std_orig / std_external # std scales by given std assert np.testing.assert_almost_equal(data_mean_transformed, data_mean_expected) is None assert np.testing.assert_almost_equal(data_std_transformed, data_std_expected) is None @pytest.mark.parametrize('mean, std, method, msg', [(10, 3, 'standardise', ''), (6, None, 'standardise', 'std, '), (None, 3, 'standardise', 'mean, '), (19, None, 'centre', ''), (None, 2, 'centre', 'mean, '), (8, 2, 'centre', ''), (None, None, 'standardise', 'mean, std, ')]) def test_check_inverse_transform_params(self, data_prep, mean, std, method, msg): if len(msg) > 0: with pytest.raises(AttributeError) as e: data_prep.check_inverse_transform_params(mean, std, method) assert msg in e.value.args[0] else: assert data_prep.check_inverse_transform_params(mean, std, method) is None def test_transform_centre(self, data_prep): assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None data_std_orig = data_prep.data.std('datetime').variable.values data_prep.transform('datetime', 'centre') assert data_prep._transform_method == 'centre' assert np.testing.assert_almost_equal(data_prep.data.mean('datetime').variable.values, np.array([[0, 0]])) is None assert np.testing.assert_almost_equal(data_prep.data.std('datetime').variable.values, data_std_orig) is None assert data_prep.std is None def test_transform_centre_apply(self, data_prep): assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None data_mean_orig = data_prep.data.mean('datetime').variable.values data_std_orig = data_prep.data.std('datetime').variable.values mean_external = np.array([20, 12]) mean = xr.DataArray(mean_external, coords={"variables": ['o3', 'temp']}, dims=["variables"]) data_prep.transform('datetime', 'centre', mean=mean) assert all(data_prep.mean.values == mean_external) assert data_prep.std is None data_mean_transformed = data_prep.data.mean('datetime').variable.values data_std_transformed = data_prep.data.std('datetime').variable.values data_mean_expected = (data_mean_orig - mean_external) # mean scales as any other data assert np.testing.assert_almost_equal(data_mean_transformed, data_mean_expected) is None assert np.testing.assert_almost_equal(data_std_transformed, data_std_orig) is None @pytest.mark.parametrize('method', ['standardise', 'centre']) def test_transform_inverse(self, data_prep, method): data_org = data_prep.data data_prep.transform('datetime', method) data_prep.inverse_transform() assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None assert np.testing.assert_array_almost_equal(data_org, data_prep.data) is None data_prep.transform('datetime', method) data_prep.transform('datetime', inverse=True) assert data_prep._transform_method is None assert data_prep.mean is None assert data_prep.std is None assert np.testing.assert_array_almost_equal(data_org, data_prep.data) is None @pytest.mark.parametrize('method', ['normalise', 'unknownmethod']) def test_transform_errors(self, data_prep, method): with pytest.raises(NotImplementedError): data_prep.transform('datetime', method) data_prep._transform_method = method with pytest.raises(AssertionError) as e: data_prep.transform('datetime', method) assert "Transform method is already set." in e.value.args[0] @pytest.mark.parametrize('method', ['normalise', 'unknownmethod']) def test_transform_inverse_errors(self, data_prep, method): with pytest.raises(AssertionError) as e: data_prep.inverse_transform() assert "Inverse transformation method is not set." in e.value.args[0] data_prep.mean = 1 data_prep.std = 1 data_prep._transform_method = method with pytest.raises(NotImplementedError): data_prep.inverse_transform() def test_get_transformation_information(self, data_prep): assert (None, None, None) == data_prep.get_transformation_information("o3") mean_test = data_prep.data.mean("datetime").sel(variables='o3').values std_test = data_prep.data.std("datetime").sel(variables='o3').values data_prep.transform('datetime') mean, std, info = data_prep.get_transformation_information("o3") assert np.testing.assert_almost_equal(mean, mean_test) is None assert np.testing.assert_almost_equal(std, std_test) is None assert info == "standardise" def test_remove_nan_no_hist_or_label(self, data_prep): assert not any([data_prep.history, data_prep.label, data_prep.observation]) data_prep.remove_nan('datetime') assert not any([data_prep.history, data_prep.label, data_prep.observation]) data_prep.make_history_window('variables', 6, 'datetime') assert data_prep.history is not None data_prep.remove_nan('datetime') assert data_prep.history is None data_prep.make_labels('variables', 'o3', 'datetime', 2) data_prep.make_observation('variables', 'o3', 'datetime') assert all(map(lambda x: x is not None, [data_prep.label, data_prep.observation])) data_prep.remove_nan('datetime') assert not any([data_prep.history, data_prep.label, data_prep.observation]) def test_remove_nan(self, data_prep): data_prep.make_history_window('variables', -12, 'datetime') data_prep.make_labels('variables', 'o3', 'datetime', 3) data_prep.make_observation('variables', 'o3', 'datetime') shape = data_prep.history.shape data_prep.remove_nan('datetime') assert data_prep.history.isnull().sum() == 0 assert itemgetter(0, 1, 3)(shape) == itemgetter(0, 1, 3)(data_prep.history.shape) assert shape[2] >= data_prep.history.shape[2] remaining_len = data_prep.history.datetime.shape assert remaining_len == data_prep.label.datetime.shape assert remaining_len == data_prep.observation.datetime.shape def test_remove_nan_too_short(self, data_prep): data_prep.kwargs["min_length"] = 4000 # actual length of series is 3940 data_prep.make_history_window('variables', -12, 'datetime') data_prep.make_labels('variables', 'o3', 'datetime', 3) data_prep.make_observation('variables', 'o3', 'datetime') data_prep.remove_nan('datetime') assert not any([data_prep.history, data_prep.label, data_prep.observation]) def test_create_index_array(self, data_prep): index_array = data_prep.create_index_array('window', range(1, 4)) assert np.testing.assert_array_equal(index_array.data, [1, 2, 3]) is None assert index_array.name == 'window' assert index_array.coords.dims == ('window',) index_array = data_prep.create_index_array('window', range(0, 1)) assert np.testing.assert_array_equal(index_array.data, [0]) is None assert index_array.name == 'window' assert index_array.coords.dims == ('window',) @staticmethod def extract_window_data(res, orig, w): slice = {'variables': ['temp'], 'Stations': 'DEBW107', 'datetime': dt.datetime(1997, 1, 6)} window = res.sel(slice).data.flatten() if w <= 0: delta = w w = abs(w) + 1 else: delta = 1 slice = {'variables': ['temp'], 'Stations': 'DEBW107', 'datetime': pd.date_range(dt.date(1997, 1, 6) + dt.timedelta(days=delta), periods=w, freq='D')} orig_slice = orig.sel(slice).data.flatten() return window, orig_slice def test_shift(self, data_prep): res = data_prep.shift('datetime', 4) window, orig = self.extract_window_data(res, data_prep.data, 4) assert res.coords.dims == ('window', 'Stations', 'datetime', 'variables') assert list(res.data.shape) == [4, *data_prep.data.shape] assert np.testing.assert_array_equal(orig, window) is None res = data_prep.shift('datetime', -3) window, orig = self.extract_window_data(res, data_prep.data, -3) assert list(res.data.shape) == [4, *data_prep.data.shape] assert np.testing.assert_array_equal(orig, window) is None res = data_prep.shift('datetime', 0) window, orig = self.extract_window_data(res, data_prep.data, 0) assert list(res.data.shape) == [1, *data_prep.data.shape] assert np.testing.assert_array_equal(orig, window) is None def test_make_history_window(self, data_prep): assert data_prep.history is None data_prep.make_history_window("variables", 5, "datetime") assert data_prep.history is not None save_history = data_prep.history data_prep.make_history_window("variables", -5, "datetime") assert np.testing.assert_array_equal(data_prep.history, save_history) is None def test_make_labels(self, data_prep): assert data_prep.label is None data_prep.make_labels('variables', 'o3', 'datetime', 3) assert data_prep.label.variables.data == 'o3' assert list(data_prep.label.shape) == [3, *data_prep.data.shape[:2]] save_label = data_prep.label.copy() data_prep.make_labels('variables', 'o3', 'datetime', -3) assert np.testing.assert_array_equal(data_prep.label, save_label) is None def test_make_labels_multiple(self, data_prep): assert data_prep.label is None data_prep.make_labels("variables", ["o3", "temp"], "datetime", 4) assert all(data_prep.label.variables.data == ["o3", "temp"]) assert list(data_prep.label.shape) == [4, *data_prep.data.shape[:2], 2] def test_make_observation(self, data_prep): assert data_prep.observation is None data_prep.make_observation("variables", "o3", "datetime") assert data_prep.observation.variables.data == "o3" assert list(data_prep.observation.shape) == [1, 1, data_prep.data.datetime.shape[0]] def test_make_observation_multiple(self, data_prep): assert data_prep.observation is None data_prep.make_observation("variables", ["o3", "temp"], "datetime") assert all(data_prep.observation.variables.data == ["o3", "temp"]) assert list(data_prep.observation.shape) == [1, 1, data_prep.data.datetime.shape[0], 2] def test_slice(self, data_prep): res = data_prep._slice(data_prep.data, dt.date(1997, 1, 1), dt.date(1997, 1, 10), 'datetime') assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data_prep.data.shape) assert res.shape[1] == 10 def test_slice_prep(self, data_prep): res = data_prep._slice_prep(data_prep.data) assert res.shape == data_prep.data.shape data_prep.kwargs['start'] = res.coords['datetime'][0].values data_prep.kwargs['end'] = res.coords['datetime'][9].values res = data_prep._slice_prep(data_prep.data) assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data_prep.data.shape) assert res.shape[1] == 10 def test_check_for_neg_concentrations(self, data_prep): res = data_prep.check_for_negative_concentrations(data_prep.data) assert res.sel({'variables': 'o3'}).min() >= 0 res = data_prep.check_for_negative_concentrations(data_prep.data, minimum=2) assert res.sel({'variables': 'o3'}).min() >= 2 def test_get_transposed_history(self, data_prep): data_prep.make_history_window("variables", 3, "datetime") transposed = data_prep.get_transposed_history() assert transposed.coords.dims == ("datetime", "window", "Stations", "variables") def test_get_transposed_label(self, data_prep): data_prep.make_labels("variables", "o3", "datetime", 2) transposed = data_prep.get_transposed_label() assert transposed.coords.dims == ("datetime", "window") def test_multiply_extremes(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) orig = data_prep.label data_prep.multiply_extremes(1) upsampled = data_prep.extremes_label assert (upsampled > 1).sum() == (orig > 1).sum() assert (upsampled < -1).sum() == (orig < -1).sum() def test_multiply_extremes_from_list(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) orig = data_prep.label data_prep.multiply_extremes([1, 1.5, 2, 3]) upsampled = data_prep.extremes_label def f(d, op, n): return op(d, n).any(dim="window").sum() assert f(upsampled, gt, 1) == sum([f(orig, gt, 1), f(orig, gt, 1.5), f(orig, gt, 2) * 2, f(orig, gt, 3) * 4]) assert f(upsampled, lt, -1) == sum( [f(orig, lt, -1), f(orig, lt, -1.5), f(orig, lt, -2) * 2, f(orig, lt, -3) * 4]) def test_multiply_extremes_wrong_extremes(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) with pytest.raises(TypeError) as e: data_prep.multiply_extremes([1, "1.5", 2]) assert "Elements of list extreme_values have to be (<class 'float'>, <class 'int'>), but at least element 1.5" \ " is type <class 'str'>" in e.value.args[0] def test_multiply_extremes_right_tail(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) orig = data_prep.label data_prep.multiply_extremes([1, 2], extremes_on_right_tail_only=True) upsampled = data_prep.extremes_label def f(d, op, n): return op(d, n).any(dim="window").sum() assert f(upsampled, gt, 1) == sum([f(orig, gt, 1), f(orig, gt, 2)]) assert upsampled.shape[2] == sum([f(orig, gt, 1), f(orig, gt, 2)]) assert f(upsampled, lt, -1) == 0 def test_multiply_extremes_none_label(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.label = None assert data_prep.multiply_extremes([1], extremes_on_right_tail_only=False) is None def test_multiply_extremes_none_history(self, data_prep): data_prep.transform("datetime") data_prep.history = None data_prep.make_labels("variables", "o3", "datetime", 2) assert data_prep.multiply_extremes([1], extremes_on_right_tail_only=False) is None def test_multiply_extremes_none_label_history(self, data_prep): data_prep.history = None data_prep.label = None assert data_prep.multiply_extremes([1], extremes_on_right_tail_only=False) is None def test_get_extremes_history(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) data_prep.make_observation("variables", "o3", "datetime") data_prep.remove_nan("datetime") data_prep.multiply_extremes([1, 2], extremes_on_right_tail_only=True) assert (data_prep.get_extremes_history() == data_prep.extremes_history.transpose("datetime", "window", "Stations", "variables")).all() def test_get_extremes_label(self, data_prep): data_prep.transform("datetime") data_prep.make_history_window("variables", 3, "datetime") data_prep.make_labels("variables", "o3", "datetime", 2) data_prep.make_observation("variables", "o3", "datetime") data_prep.remove_nan("datetime") data_prep.multiply_extremes([1, 2], extremes_on_right_tail_only=True) assert (data_prep.get_extremes_label() == data_prep.extremes_label.squeeze("Stations").transpose("datetime", "window")).all() class TestDataPrepJoin: @pytest.fixture def data(self): return DataPrep(os.path.join(os.path.dirname(__file__), 'data'), 'DEBW107', ['o3', 'temp'], station_type='background', test='testKWARGS', network="AIRBASE", statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) @pytest.fixture def data_prep_no_init(self): d = object.__new__(DataPrep) d.path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') d.network = 'UBA' d.station = ['DEBW107'] d.variables = ['o3', 'temp'] d.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} d.station_type = "background" d.sampling = "daily" d.kwargs = None return d def test_init(self, data): assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') assert data.network == 'AIRBASE' assert data.station == ['DEBW107'] assert data.variables == ['o3', 'temp'] assert data.station_type == "background" assert data.statistics_per_var == {'o3': 'dma8eu', 'temp': 'maximum'} assert not any([data.mean, data.std, data.history, data.label, data.observation]) assert {'test': 'testKWARGS'}.items() <= data.kwargs.items() def test_init_no_stats(self): with pytest.raises(NotImplementedError): DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp']) def test_download_data(self, data_prep_no_init): file_name = data_prep_no_init._set_file_name() meta_file = data_prep_no_init._set_meta_file_name() data_prep_no_init.kwargs = {"store_data_locally": False} data_prep_no_init.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} data, meta = data_prep_no_init.download_data(file_name, meta_file) assert isinstance(data, xr.DataArray) assert isinstance(meta, pd.DataFrame) def test_download_data_from_join(self, data_prep_no_init): file_name = data_prep_no_init._set_file_name() meta_file = data_prep_no_init._set_meta_file_name() data_prep_no_init.kwargs = {"store_data_locally": False} data_prep_no_init.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} xarr, meta = data_prep_no_init.download_data_from_join(file_name, meta_file) assert isinstance(xarr, xr.DataArray) assert isinstance(meta, pd.DataFrame) def test_check_station_meta(self, caplog, data_prep_no_init): caplog.set_level(logging.DEBUG) file_name = data_prep_no_init._set_file_name() meta_file = data_prep_no_init._set_meta_file_name() data_prep_no_init.kwargs = {"store_data_locally": False} data_prep_no_init.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} _, meta = data_prep_no_init.download_data(file_name, meta_file) data_prep_no_init.meta = meta assert data_prep_no_init.check_station_meta() is None data_prep_no_init.station_type = "traffic" with pytest.raises(FileNotFoundError) as e: data_prep_no_init.check_station_meta() msg = "meta data does not agree with given request for station_type: traffic (requested) != background (local)" assert caplog.record_tuples[-1][:-1] == ('root', 10) assert msg in caplog.record_tuples[-1][-1] def test_load_data_overwrite_local_data(self, data_prep_no_init): data_prep_no_init.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} file_path = data_prep_no_init._set_file_name() meta_file_path = data_prep_no_init._set_meta_file_name() os.remove(file_path) if os.path.exists(file_path) else None os.remove(meta_file_path) if os.path.exists(meta_file_path) else None assert not os.path.exists(file_path) assert not os.path.exists(meta_file_path) data_prep_no_init.kwargs = {"overwrite_local_data": True} data_prep_no_init.load_data() assert os.path.exists(file_path) assert os.path.exists(meta_file_path) t = os.stat(file_path).st_ctime tm = os.stat(meta_file_path).st_ctime data_prep_no_init.load_data() assert os.path.exists(file_path) assert os.path.exists(meta_file_path) assert os.stat(file_path).st_ctime > t assert os.stat(meta_file_path).st_ctime > tm assert isinstance(data_prep_no_init.data, xr.DataArray) assert isinstance(data_prep_no_init.meta, pd.DataFrame) def test_load_data_keep_local_data(self, data_prep_no_init): data_prep_no_init.statistics_per_var = {'o3': 'dma8eu', 'temp': 'maximum'} data_prep_no_init.station_type = None data_prep_no_init.kwargs = {} file_path = data_prep_no_init._set_file_name() data_prep_no_init.load_data() assert os.path.exists(file_path) t = os.stat(file_path).st_ctime data_prep_no_init.load_data() assert os.path.exists(data_prep_no_init._set_file_name()) assert os.stat(file_path).st_ctime == t assert isinstance(data_prep_no_init.data, xr.DataArray) assert isinstance(data_prep_no_init.meta, pd.DataFrame) def test_repr(self, data_prep_no_init): path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') assert data_prep_no_init.__repr__().rstrip() == f"Dataprep(path='{path}', network='UBA', " \ f"station=['DEBW107'], variables=['o3', 'temp'], " \ f"station_type=background, **None)".rstrip() def test_check_station(self, data): with pytest.raises(EmptyQueryResult): data_new = DataPrep(os.path.join(os.path.dirname(__file__), 'data'), 'dummy', 'DEBW107', ['o3', 'temp'], station_type='traffic', statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})