diff --git a/src/data_preparation.py b/src/data_preparation.py index cc2abdf1f0d3bd1fe55fc69d2ace9eb42ead2038..34ff8dfb00b89fdee6494273503e47132a48a61f 100644 --- a/src/data_preparation.py +++ b/src/data_preparation.py @@ -7,23 +7,17 @@ import pandas as pd import logging import os from src import join, helpers -from typing import Union, List +from src import statistics +from typing import Union, List, Dict -class DataPrep: +class DataPrep(object): - def __init__(self, path: str, network: str, station: Union[str, List[str]], variables, **kwargs): - self.path = path + def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs): + self.path = os.path.abspath(path) self.network = network self.station = helpers.to_list(station) self.variables = variables - self.statistics_per_var = kwargs.get("statistics_per_var", None) - if self.statistics_per_var is not None: - self.load_data() - else: - raise NotImplementedError - # self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station, - # self.variables, **kwargs) self.mean = None self.std = None self.df = None @@ -32,6 +26,14 @@ class DataPrep: self.kwargs = kwargs self.data = None self.meta = None + self._transform_method = None + self.statistics_per_var = kwargs.get("statistics_per_var", None) + if self.statistics_per_var is not None: + self.load_data() + else: + raise NotImplementedError + # self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station, + # self.variables, **kwargs) def load_data(self): self.check_path_and_create() @@ -54,30 +56,112 @@ class DataPrep: self.meta.to_csv(meta_file) def _set_file_name(self): - return f"{self.path}{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc" + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc") def _set_meta_file_name(self): - return f"{self.path}{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv" + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv") def __repr__(self): return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \ - f"variables={self.variables}, **{self.kwargs}" + f"variables={self.variables}, **{self.kwargs})" def check_path_and_create(self): try: os.makedirs(self.path) - logging.info("Created path: {}".format(self.path)) + logging.info(f"Created path: {self.path}") except FileExistsError: + logging.info(f"Path already exists: {self.path}") pass - def interpolate(self, dim=None, method='linear', limit=None, use_coordinate=True, **kwargs): - raise NotImplementedError + def interpolate(self, dim: str = None, method: str = 'linear', limit: int = None, + use_coordinate: Union[bool, str] = True, **kwargs): + """ + (Copy paste from dataarray.interpolate_na) + Interpolate values according to different methods. + + :param dim: + Specifies the dimension along which to interpolate. + :param method: + {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial', 'barycentric', 'krog', 'pchip', + 'spline', 'akima'}, optional + String indicating which method to use for interpolation: + + - 'linear': linear interpolation (Default). Additional keyword + arguments are passed to ``numpy.interp`` + - 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial': are passed to ``scipy.interpolate.interp1d``. If + method=='polynomial', the ``order`` keyword argument must also be + provided. + - 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their + respective``scipy.interpolate`` classes. + :param limit: + default None + Maximum number of consecutive NaNs to fill. Must be greater than 0 + or None for no limit. + :param use_coordinate: + default True + Specifies which index to use as the x values in the interpolation + formulated as `y = f(x)`. If False, values are treated as if + eqaully-spaced along `dim`. If True, the IndexVariable `dim` is + used. If use_coordinate is a string, it specifies the name of a + coordinate variariable to use as the index. + :param kwargs: + :return: xarray.DataArray + """ + + self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate, + **kwargs) def restandardise(self, data, dim='variables', **kwargs): - raise NotImplementedError - - def standardise(self, dim): - raise NotImplementedError + """ + + :param data: + :param dim: + :param kwargs: + :return: + """ + variables = kwargs.get('variables', None) + if variables is None: + return FKf.restandardize(data, mean=self.mean, std=self.std, stand=True) + else: + return FKf.restandardize(data, + mean=self.mean.sel({dim: variables}).values, + std=self.std.sel({dim: variables}).values, + stand=True) + + def transform(self, dim: Union[str, int] = 0, method: str = 'standardise') -> None: + """ + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 + and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale + (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation + + :param string/int dim: + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :param method: + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + + def f(data): + if method == 'standardise': + return statistics.standardise(data, dim) + elif method == 'centre': + return statistics.centre(data, dim) + elif method == 'normalise': + # use min/max of data or given min/max + raise NotImplementedError + else: + raise NotImplementedError + + if self._transform_method is not None: + raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with " + f"{self._transform_method}. Please perform inverse transformation of data first.") + self._transform_method = method + self.mean, self.std, self.data = f(self.data) def make_history_window(self, dim, window): raise NotImplementedError @@ -95,6 +179,13 @@ class DataPrep: def create_indexarray(index_name, index_values): raise NotImplementedError + def _slice_prep(self, data, coord='datetime'): + raise NotImplementedError + + @staticmethod + def _slice(data, start, end, coord): + raise NotImplementedError + if __name__ == "__main__": diff --git a/test/test_data_preparation.py b/test/test_data_preparation.py new file mode 100644 index 0000000000000000000000000000000000000000..7283196fe479f1aecee84c8491d956ac57a097fe --- /dev/null +++ b/test/test_data_preparation.py @@ -0,0 +1,85 @@ +import pytest +import os +from src.data_preparation import DataPrep +import logging +import numpy as np +import xarray as xr + + +class TestDataPrep: + + @pytest.fixture + def data(self): + return DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], test='testKWARGS', + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + def test_init(self, data): + assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') + assert data.network == 'dummy' + assert data.station == ['DEBW107'] + assert data.variables == ['o3', 'temp'] + assert data.statistics_per_var == {'o3': 'dma8eu', 'temp': 'maximum'} + assert not all([data.mean, data.std, data.df, data.history, data.label]) + assert {'test': 'testKWARGS'}.items() <= data.kwargs.items() + + def test_init_no_stats(self): + with pytest.raises(NotImplementedError): + DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp']) + + def test_check_path_and_create(self, caplog): + caplog.set_level(logging.INFO) + d = object.__new__(DataPrep) + d.path = 'data/test' + assert not os.path.exists('data/test') + d.check_path_and_create() + assert os.path.exists('data/test') + assert caplog.messages[0] == "Created path: data/test" + d.check_path_and_create() + assert caplog.messages[1] == "Path already exists: data/test" + os.rmdir('data/test') + + def test_repr(self): + d = object.__new__(DataPrep) + d.path = 'data/test' + d.network = 'dummy' + d.station = ['DEBW107'] + d.variables = ['o3', 'temp'] + d.kwargs = None + assert d.__repr__().rstrip() == "Dataprep(path='data/test', network='dummy', station=['DEBW107'], "\ + "variables=['o3', 'temp'], **None)".rstrip() + + def test_set_file_name_and_meta(self): + d = object.__new__(DataPrep) + d.path = os.path.abspath('data/test') + d.station = 'TESTSTATION' + d.variables = ['a', 'bc'] + assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), + "data/test/TESTSTATION_a_bc.nc") + assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), + "data/test/TESTSTATION_a_bc_meta.csv") + + def test_interpolate(self): + pass + + def test_transform_standardise(self, data): + assert data._transform_method is None + assert data.mean is None + assert data.std is None + data.transform('datetime') + assert data._transform_method == 'standardise' + assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None + assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, np.array([[1, 1]])) is None + assert isinstance(data.mean, xr.DataArray) + assert isinstance(data.std, xr.DataArray) + + def test_transform_centre(self, data): + assert data._transform_method is None + assert data.mean is None + assert data.std is None + data_std_org = data.data.std('datetime'). variable.values + data.transform('datetime', 'centre') + assert data._transform_method == 'centre' + assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None + assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, data_std_org) is None + assert data.std is None +