diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000000000000000000000000000000000000..baa10de454893675aeedc6275e2c6725b0b84966 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,25 @@ +# .coveragerc to control coverage.py +[run] +branch = True + + +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: + + # Don't complain about import statements + import diff --git a/.gitignore b/.gitignore index 56b52c1e8a076839cfd155613b04a95ffaa19541..e0fbe0994d8a7c695caffb5fe3135d0cc54abc0b 100644 --- a/.gitignore +++ b/.gitignore @@ -40,10 +40,18 @@ Thumbs.db .idea/ /venv/ -# check plot folder # -##################### +# don't check data and plot folder # +#################################### +/data/ /plots/ # tmp folder # ############## /tmp/ + +# test related data # +##################### +.coverage +htmlcov/ +.pytest_cache +/test/data/ diff --git a/requirements.txt b/requirements.txt index a956e8ff221dd8425a3cad3df4a45c334d58a040..4f4c9fd2d3311335f43fe9efa367205a2e0c196f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,10 @@ Keras==2.2.4 numpy==1.15.4 tensorflow==1.12.0 +xarray==0.14.0 +pandas==0.25.1 +requests==2.22.0 pytest==5.2.1 -pydot \ No newline at end of file +pytest-lazy-fixture==0.6.1 +pytest-cov +pydot diff --git a/src/data_preparation.py b/src/data_preparation.py new file mode 100644 index 0000000000000000000000000000000000000000..c23c9a9f962848f63a3ff2aa8b6e9f012dd562e4 --- /dev/null +++ b/src/data_preparation.py @@ -0,0 +1,359 @@ +__author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-16' + + +import xarray as xr +import pandas as pd +import numpy as np +import logging +import os +from src import join, helpers +from src import statistics +from typing import Union, List, Iterable +import datetime as dt + + +# define a more general date type for type hinting +date = Union[dt.date, dt.datetime] + + +class DataPrep(object): + """ + This class prepares data to be used in neural networks. The instance searches for local stored data, that meet the + given demands. If no local data is found, the DataPrep instance will load data from TOAR database and store this + data locally to use the next time. For the moment, there is only support for daily aggregated time series. The + aggregation can be set manually and differ for each variable. + + After data loading, different data pre-processing steps can be executed to prepare the data for further + applications. Especially the following methods can be used for the pre-processing step: + - interpolate: interpolate between data points by using xarray's interpolation method + - standardise: standardise data to mean=1 and std=1, centralise to mean=0, additional methods like normalise on + interval [0, 1] are not implemented yet. + - make window history: represent the history (time steps before) for training/ testing; X + - make labels: create target vector with given leading time steps for training/ testing; y + - remove Nans jointly from desired input and output, only keeps time steps where no NaNs are present in X AND y. Use + this method after the creation of the window history and labels to clean up the data cube. + + To create a DataPrep instance, it is needed to specify the stations by id (e.g. "DEBW107"), its network (e.g. UBA, + "Umweltbundesamt") and the variables to use. Further options can be set in the instance. + * `statistics_per_var`: define a specific statistic to extract from the TOAR database for each variable. + * `start`: define a start date for the data cube creation. Default: Use the first entry in time series + * `end`: set the end date for the data cube. Default: Use last date in time series. + * `store_data_locally`: store recently downloaded data on local disk. Default: True + * set further parameters for xarray's interpolation methods to modify the interpolation scheme + + """ + + def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs): + self.path = os.path.abspath(path) + self.network = network + self.station = helpers.to_list(station) + self.variables = variables + self.mean = None + self.std = None + self.history = None + self.label = None + self.kwargs = kwargs + self.data = None + self.meta = None + self._transform_method = None + self.statistics_per_var = kwargs.get("statistics_per_var", None) + if self.statistics_per_var is not None: + self.load_data() + else: + raise NotImplementedError # hourly data usage is not implemented yet + # self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station, + # self.variables, **kwargs) + + def load_data(self): + """ + Load data and meta data either from local disk (preferred) or download new data from TOAR database if no local + data is available. The latter case, store downloaded data locally if wished (default yes). + """ + + self.check_path_and_create() + file_name = self._set_file_name() + meta_file = self._set_meta_file_name() + try: + data = self._slice_prep(xr.open_dataarray(file_name)) + self.data = self.check_for_negative_concentrations(data) + self.meta = pd.read_csv(meta_file, index_col=0) + except FileNotFoundError as e: + logging.warning(e) + data, self.meta = self.download_data_from_join(file_name, meta_file) + data = self._slice_prep(data) + self.data = self.check_for_negative_concentrations(data) + + def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]: + """ + Download data from TOAR database using the JOIN interface. + :param file_name: + :param meta_file: + :return: + """ + df_all = {} + df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var) + df_all[self.station[0]] = df + # convert df_all to xarray + xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} + xarr = xr.Dataset(xarr).to_array(dim='Stations') + if self.kwargs.get('store_data_locally', True): + # save locally as nc/csv file + xarr.to_netcdf(path=file_name) + meta.to_csv(meta_file) + return xarr, meta + + def _set_file_name(self): + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc") + + def _set_meta_file_name(self): + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv") + + def __repr__(self): + return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \ + f"variables={self.variables}, **{self.kwargs})" + + def check_path_and_create(self): + try: + os.makedirs(self.path) + logging.info(f"Created path: {self.path}") + except FileExistsError: + logging.info(f"Path already exists: {self.path}") + pass + + def interpolate(self, dim: str, method: str = 'linear', limit: int = None, + use_coordinate: Union[bool, str] = True, **kwargs): + """ + (Copy paste from dataarray.interpolate_na) + Interpolate values according to different methods. + + :param dim: + Specifies the dimension along which to interpolate. + :param method: + {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial', 'barycentric', 'krog', 'pchip', + 'spline', 'akima'}, optional + String indicating which method to use for interpolation: + + - 'linear': linear interpolation (Default). Additional keyword + arguments are passed to ``numpy.interp`` + - 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial': are passed to ``scipy.interpolate.interp1d``. If + method=='polynomial', the ``order`` keyword argument must also be + provided. + - 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their + respective``scipy.interpolate`` classes. + :param limit: + default None + Maximum number of consecutive NaNs to fill. Must be greater than 0 + or None for no limit. + :param use_coordinate: + default True + Specifies which index to use as the x values in the interpolation + formulated as `y = f(x)`. If False, values are treated as if + eqaully-spaced along `dim`. If True, the IndexVariable `dim` is + used. If use_coordinate is a string, it specifies the name of a + coordinate variariable to use as the index. + :param kwargs: + :return: xarray.DataArray + """ + + self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate, + **kwargs) + + @staticmethod + def check_inverse_transform_params(mean, std, method) -> None: + msg = "" + if method in ['standardise', 'centre'] and mean is None: + msg += "mean, " + if method == 'standardise' and std is None: + msg += "std, " + if len(msg) > 0: + raise AttributeError(f"Inverse transform {method} can not be executed because following is None: {msg}") + + def inverse_transform(self) -> None: + """ + Perform inverse transformation + :return: + """ + def f_inverse(data, mean, std, method_inverse): + if method_inverse == 'standardise': + return statistics.standardise_inverse(data, mean, std), None, None + elif method_inverse == 'centre': + return statistics.centre_inverse(data, mean), None, None + elif method_inverse == 'normalise': + raise NotImplementedError + else: + raise NotImplementedError + + if self._transform_method is None: + raise AssertionError("Inverse transformation method is not set. Data cannot be inverse transformed.") + self.check_inverse_transform_params(self.mean, self.std, self._transform_method) + self.data, self.mean, self.std = f_inverse(self.data, self.mean, self.std, self._transform_method) + self._transform_method = None + + def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False) -> None: + """ + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 + and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale + (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This + method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the + internal transform method, internal mean and internal standard deviation weren't set ('inverse=True'). + + :param string/int dim: This param is not used for inverse transformation. + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :param method: Choose the transformation method from 'standardise' and 'centre'. 'normalise' is not implemented + yet. This param is not used for inverse transformation. + :param inverse: Switch between transformation and inverse transformation. + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + + def f(data): + if method == 'standardise': + return statistics.standardise(data, dim) + elif method == 'centre': + return statistics.centre(data, dim) + elif method == 'normalise': + # use min/max of data or given min/max + raise NotImplementedError + else: + raise NotImplementedError + + if not inverse: + if self._transform_method is not None: + raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with " + f"{self._transform_method}. Please perform inverse transformation of data first.") + self.mean, self.std, self.data = f(self.data) + self._transform_method = method + else: + self.inverse_transform() + + def make_history_window(self, dim: str, window: int) -> None: + """ + This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window' + containing the shifted data. This is used to represent history in the data. Results are stored in self.history . + + :param dim: Dimension along shift will be applied + :param window: number of time steps to look back in history + Note: window will be treated as negative value. This should be in agreement with looking back on + a time line. Nonetheless positive values are allowed but they are converted to its negative + expression + """ + window = -abs(window) + self.history = self.shift(dim, window) + + def shift(self, dim: str, window: int) -> xr.DataArray: + """ + This function uses xarray's shift function multiple times to represent history (if window <= 0) + or lead time (if window > 0) + :param dim: dimension along shift is applied + :param window: number of steps to shift (corresponds to the window length) + :return: + """ + start = 1 + end = 1 + if window <= 0: + start = window + else: + end = window + 1 + res = [] + for w in range(start, end): + res.append(self.data.shift({dim: -w})) + window_array = self.create_index_array('window', range(start, end)) + res = xr.concat(res, dim=window_array) + return res + + def make_labels(self, dim_name_of_target: str, target_var: str, dim_name_of_shift: str, window: int) -> None: + """ + This function creates a xarray.DataArray containing labels + + :param dim_name_of_target: Name of dimension which contains the target variable + :param target_var: Name of target variable in 'dimension' + :param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied + :param window: lead time of label + """ + window = abs(window) + self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var}) + + def history_label_nan_remove(self, dim: str) -> None: + """ + All NAs slices in dim which contain nans in self.history or self.label are removed in both data sets. + This is done to present only a full matrix to keras.fit. + + :param dim: + :return: + """ + intersect = [] + if (self.history is not None) and (self.label is not None): + non_nan_history = self.history.dropna(dim=dim) + non_nan_label = self.label.dropna(dim=dim) + intersect = np.intersect1d(non_nan_history.coords[dim].values, + non_nan_label.coords[dim].values) + + if len(intersect) == 0: + self.history = None + self.label = None + else: + self.history = self.history.sel({dim: intersect}) + self.label = self.label.sel({dim: intersect}) + + @staticmethod + def create_index_array(index_name: str, index_value: Iterable[int]) -> xr.DataArray: + """ + This Function crates a 1D xarray.DataArray with given index name and value + + :param index_name: + :param index_value: + :return: + """ + ind = pd.DataFrame({'val': index_value}, index=index_value) + res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim='variable', drop=True) + res.name = index_name + return res + + def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray: + """ + This function prepares all settings for slicing and executes _slice + :param data: + :param coord: name of axis to slice + :return: + """ + start = self.kwargs.get('start', data.coords[coord][0].values) + end = self.kwargs.get('end', data.coords[coord][-1].values) + return self._slice(data, start, end, coord) + + @staticmethod + def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray: + """ + This function slices through a given data_item (for example select only values of 2011) + :param data: + :param start: + :param end: + :param coord: name of axis to slice + :return: + """ + return data.loc[{coord: slice(start, end)}] + + def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: + """ + This function sets all negative concentrations to zero. Names of all concentrations are extracted from + https://join.fz-juelich.de/services/rest/surfacedata/ #2.1 Parameters + :param data: + :param minimum: + :return: + """ + chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", + "propane", "so2", "toluene"] + used_chem_vars = list(set(chem_vars) & set(self.variables)) + data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) + return data + + +if __name__ == "__main__": + + dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + print(dp) diff --git a/src/helpers.py b/src/helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..424b2fb519726adde9d8d30fb610379f9b4dfed3 --- /dev/null +++ b/src/helpers.py @@ -0,0 +1,8 @@ +__author__ = 'Lukas Leufen' +__date__ = '2019-10-21' + + +def to_list(arg): + if not isinstance(arg, list): + arg = [arg] + return arg diff --git a/src/join.py b/src/join.py new file mode 100644 index 0000000000000000000000000000000000000000..4909ec267d06e6739fd61d9ee187cf9f7aec922a --- /dev/null +++ b/src/join.py @@ -0,0 +1,110 @@ +__author__ = 'Felix Kleinert, Lukas Leufen' +__date__ = '2019-10-16' + + +import requests +import json +import logging +import pandas as pd +import datetime as dt +from typing import Iterator, Union, List + +join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' +logging.basicConfig(level=logging.INFO) + + +def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]: + + """ + read data from JOIN/TOAR + :param station_name: Station name e.g. DEBY122 + :param statvar: key as variable like 'O3', values as statistics on keys like 'mean' + :returns: + - df - pandas df with all variables and statistics + - meta - pandas df with all meta information + """ + # make sure station_name parameter is a list + if not isinstance(station_name, list): + station_name = [station_name] + + # load series information + opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]} + url = create_url(**opts) + response = requests.get(url) + station_vars = response.json() + vars_dict = {item[3].lower(): item[0] for item in station_vars} + + # download all variables with given statistic + data = None + df = None + for var in _lower_list(sorted(vars_dict.keys())): + if var in statvar.keys(): + logging.info('load: {}'.format(var)) + + # create data link + opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var], + 'sampling': 'daily', 'capture': 0, 'min_data_length': 1460} + url = create_url(**opts) + + # load data + response = requests.get(url) + data = response.json() + + # correct namespace of statistics + stat = _correct_stat_name(statvar[var]) + + # store data in pandas dataframe + index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime']) + if df is None: + df = pd.DataFrame(data[stat], index=index, columns=[var]) + else: + df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1) + logging.debug('finished: {}'.format(var)) + + if data: + meta = pd.DataFrame.from_dict(data['metadata'], orient='index') + meta.columns = station_name + return df, meta + else: + raise ValueError("No data found in JOIN.") + + +def _correct_stat_name(stat: str) -> str: + """ + Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping + namespace. + :param stat: namespace from JOIN server + :return: stat mapped to local namespace + """ + mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} + return mapping.get(stat, stat) + + +def _lower_list(args: List[str]) -> Iterator[str]: + """ + lower all elements of given list + :param args: list with string entries to lower + :return: iterator that lowers all list entries + """ + for string in args: + yield string.lower() + + +def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str: + """ + create a request url with given base url, service type and arbitrarily many additional keyword arguments + :param base: basic url of the rest service + :param service: service type, e.g. series, stats + :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' + :return: combined url as string + """ + url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items()) + return url + + +if __name__ == "__main__": + var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', + 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', + 'pblheight': 'maximum'} + station = 'DEBW107' + download_join(station, var_all_dic) diff --git a/src/statistics.py b/src/statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..060081de9e21f5cbc7c560066451bbdbf14b7eb1 --- /dev/null +++ b/src/statistics.py @@ -0,0 +1,60 @@ +__author__ = 'Lukas Leufen' +__date__ = '2019-10-23' + +import xarray as xr +import pandas as pd +from typing import Union, Tuple + + +Data = Union[xr.DataArray, pd.DataFrame] + + +def standardise(data: Data, dim: Union[str, int]) -> Tuple[Data, Data, Data]: + """ + This function standardises a xarray.dataarray (along dim) or pandas.DataFrame (along axis) with mean=0 and std=1 + :param data: + :param string/int dim: + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + return data.mean(dim), data.std(dim), (data - data.mean(dim)) / data.std(dim) + + +def standardise_inverse(data: Data, mean: Data, std: Data) -> Data: + """ + This is the inverse function of `standardise` and therefore vanishes the standardising. + :param data: + :param mean: + :param std: + :return: + """ + return data * std + mean + + +def centre(data: Data, dim: Union[str, int]) -> Tuple[Data, None, Data]: + """ + This function centres a xarray.dataarray (along dim) or pandas.DataFrame (along axis) to mean=0 + :param data: + :param string/int dim: + | for xarray.DataArray as string: name of dimension which should be standardised + | for pandas.DataFrame as int: axis of dimension which should be standardised + :return: xarray.DataArrays or pandas.DataFrames: + #. mean: Mean of data + #. std: Standard deviation of data + #. data: Standardised data + """ + return data.mean(dim), None, data - data.mean(dim) + + +def centre_inverse(data: Data, mean: Data) -> Data: + """ + This function is the inverse function of `centre` and therefore adds the given values of mean to the data. + :param data: + :param mean: + :return: + """ + return data + mean diff --git a/test/test_data_preparation.py b/test/test_data_preparation.py new file mode 100644 index 0000000000000000000000000000000000000000..0e0984f096fd444fb76f29184df8b1d85a046756 --- /dev/null +++ b/test/test_data_preparation.py @@ -0,0 +1,243 @@ +import pytest +import os +from src.data_preparation import DataPrep +import logging +import numpy as np +import xarray as xr +import datetime as dt +import pandas as pd +from operator import itemgetter + + +class TestDataPrep: + + @pytest.fixture + def data(self): + return DataPrep('test/data/', 'dummy', 'DEBW107', ['o3', 'temp'], test='testKWARGS', + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + + def test_init(self, data): + assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') + assert data.network == 'dummy' + assert data.station == ['DEBW107'] + assert data.variables == ['o3', 'temp'] + assert data.statistics_per_var == {'o3': 'dma8eu', 'temp': 'maximum'} + assert not all([data.mean, data.std, data.history, data.label]) + assert {'test': 'testKWARGS'}.items() <= data.kwargs.items() + + def test_init_no_stats(self): + with pytest.raises(NotImplementedError): + DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp']) + + def test_check_path_and_create(self, caplog): + caplog.set_level(logging.INFO) + d = object.__new__(DataPrep) + d.path = 'data/test' + assert not os.path.exists('data/test') + d.check_path_and_create() + assert os.path.exists('data/test') + assert caplog.messages[0] == "Created path: data/test" + d.check_path_and_create() + assert caplog.messages[1] == "Path already exists: data/test" + os.rmdir('data/test') + + def test_repr(self): + d = object.__new__(DataPrep) + d.path = 'data/test' + d.network = 'dummy' + d.station = ['DEBW107'] + d.variables = ['o3', 'temp'] + d.kwargs = None + assert d.__repr__().rstrip() == "Dataprep(path='data/test', network='dummy', station=['DEBW107'], "\ + "variables=['o3', 'temp'], **None)".rstrip() + + def test_set_file_name_and_meta(self): + d = object.__new__(DataPrep) + d.path = os.path.abspath('test/data/test') + d.station = 'TESTSTATION' + d.variables = ['a', 'bc'] + assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), + "data/test/TESTSTATION_a_bc.nc") + assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)), + "data/test/TESTSTATION_a_bc_meta.csv") + + @pytest.mark.parametrize('opts', [{'dim': 'datetime', 'method': 'nearest', 'limit': 10, 'use_coordinate': True}, + {'dim': 'datetime', 'limit': 5}, {'dim': 'datetime'}]) + def test_interpolate(self, data, opts): + data_org = data.data + data.interpolate(**opts) + # set default params if empty + opts["method"] = opts.get("method", 'linear') + opts["limit"] = opts.get("limit", None) + opts["use_coordinate"] = opts.get("use_coordinate", True) + assert xr.testing.assert_equal(data_org.interpolate_na(**opts), data.data) is None + + def test_transform_standardise(self, data): + assert data._transform_method is None + assert data.mean is None + assert data.std is None + data.transform('datetime') + assert data._transform_method == 'standardise' + assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None + assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, np.array([[1, 1]])) is None + assert isinstance(data.mean, xr.DataArray) + assert isinstance(data.std, xr.DataArray) + + @pytest.mark.parametrize('mean, std, method, msg', [(10, 3, 'standardise', ''), (6, None, 'standardise', 'std, '), + (None, 3, 'standardise', 'mean, '), (19, None, 'centre', ''), + (None, 2, 'centre', 'mean, '), (8, 2, 'centre', ''), + (None, None, 'standardise', 'mean, std, ')]) + def test_check_inverse_transform_params(self, data, mean, std, method, msg): + if len(msg) > 0: + with pytest.raises(AttributeError) as e: + data.check_inverse_transform_params(mean, std, method) + assert msg in e.value.args[0] + else: + assert data.check_inverse_transform_params(mean, std, method) is None + + def test_transform_centre(self, data): + assert data._transform_method is None + assert data.mean is None + assert data.std is None + data_std_org = data.data.std('datetime'). variable.values + data.transform('datetime', 'centre') + assert data._transform_method == 'centre' + assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None + assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, data_std_org) is None + assert data.std is None + + @pytest.mark.parametrize('method', ['standardise', 'centre']) + def test_transform_inverse(self, data, method): + data_org = data.data + data.transform('datetime', method) + data.inverse_transform() + assert data._transform_method is None + assert data.mean is None + assert data.std is None + assert np.testing.assert_array_almost_equal(data_org, data.data) is None + data.transform('datetime', method) + data.transform('datetime', inverse=True) + assert data._transform_method is None + assert data.mean is None + assert data.std is None + assert np.testing.assert_array_almost_equal(data_org, data.data) is None + + @pytest.mark.parametrize('method', ['normalise', 'unknownmethod']) + def test_transform_errors(self, data, method): + with pytest.raises(NotImplementedError): + data.transform('datetime', method) + data._transform_method = method + with pytest.raises(AssertionError) as e: + data.transform('datetime', method) + assert "Transform method is already set." in e.value.args[0] + + @pytest.mark.parametrize('method', ['normalise', 'unknownmethod']) + def test_transform_inverse_errors(self, data, method): + with pytest.raises(AssertionError) as e: + data.inverse_transform() + assert "Inverse transformation method is not set." in e.value.args[0] + data.mean = 1 + data.std = 1 + data._transform_method = method + with pytest.raises(NotImplementedError): + data.inverse_transform() + + def test_nan_remove_no_hist_or_label(self, data): + assert data.history is None + assert data.label is None + data.history_label_nan_remove('datetime') + assert data.history is None + assert data.label is None + data.make_history_window('datetime', 6) + assert data.history is not None + data.history_label_nan_remove('datetime') + assert data.history is None + data.make_labels('variables', 'o3', 'datetime', 2) + assert data.label is not None + data.history_label_nan_remove('datetime') + assert data.label is None + + def test_nan_remove(self, data): + data.make_history_window('datetime', -12) + data.make_labels('variables', 'o3', 'datetime', 3) + shape = data.history.shape + data.history_label_nan_remove('datetime') + assert data.history.isnull().sum() == 0 + assert itemgetter(0, 1, 3)(shape) == itemgetter(0, 1, 3)(data.history.shape) + assert shape[2] >= data.history.shape[2] + + def test_create_index_array(self, data): + index_array = data.create_index_array('window', range(1, 4)) + assert np.testing.assert_array_equal(index_array.data, [1, 2, 3]) is None + assert index_array.name == 'window' + assert index_array.coords.dims == ('window', ) + index_array = data.create_index_array('window', range(0, 1)) + assert np.testing.assert_array_equal(index_array.data, [0]) is None + assert index_array.name == 'window' + assert index_array.coords.dims == ('window', ) + + @staticmethod + def extract_window_data(res, orig, w): + slice = {'variables': ['temp'], 'Stations': 'DEBW107', 'datetime': dt.datetime(1997, 1, 6)} + window = res.sel(slice).data.flatten() + if w <= 0: + delta = w + w = abs(w)+1 + else: + delta = 1 + slice = {'variables': ['temp'], 'Stations': 'DEBW107', + 'datetime': pd.date_range(dt.date(1997, 1, 6) + dt.timedelta(days=delta), periods=w, freq='D')} + orig_slice = orig.sel(slice).data.flatten() + return window, orig_slice + + def test_shift(self, data): + res = data.shift('datetime', 4) + window, orig = self.extract_window_data(res, data.data, 4) + assert res.coords.dims == ('window', 'Stations', 'datetime', 'variables') + assert list(res.data.shape) == [4] + list(data.data.shape) + assert np.testing.assert_array_equal(orig, window) is None + res = data.shift('datetime', -3) + window, orig = self.extract_window_data(res, data.data, -3) + assert list(res.data.shape) == [4] + list(data.data.shape) + assert np.testing.assert_array_equal(orig, window) is None + res = data.shift('datetime', 0) + window, orig = self.extract_window_data(res, data.data, 0) + assert list(res.data.shape) == [1] + list(data.data.shape) + assert np.testing.assert_array_equal(orig, window) is None + + def test_make_history_window(self, data): + assert data.history is None + data.make_history_window('datetime', 5) + assert data.history is not None + save_history = data.history + data.make_history_window('datetime', -5) + assert np.testing.assert_array_equal(data.history, save_history) is None + + def test_make_labels(self, data): + assert data.label is None + data.make_labels('variables', 'o3', 'datetime', 3) + assert data.label.variables.data == 'o3' + assert list(data.label.shape) == [3] + list(data.data.shape)[:2] + save_label = data.label + data.make_labels('variables', 'o3', 'datetime', -3) + assert np.testing.assert_array_equal(data.label, save_label) is None + + def test_slice(self, data): + res = data._slice(data.data, dt.date(1997, 1, 1), dt.date(1997, 1, 10), 'datetime') + assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data.data.shape) + assert res.shape[1] == 10 + + def test_slice_prep(self, data): + res = data._slice_prep(data.data) + assert res.shape == data.data.shape + data.kwargs['start'] = res.coords['datetime'][0].values + data.kwargs['end'] = res.coords['datetime'][9].values + res = data._slice_prep(data.data) + assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data.data.shape) + assert res.shape[1] == 10 + + def test_check_for_neg_concentrations(self, data): + res = data.check_for_negative_concentrations(data.data) + assert res.sel({'variables': 'o3'}).min() >= 0 + res = data.check_for_negative_concentrations(data.data, minimum=2) + assert res.sel({'variables': 'o3'}).min() >= 2 diff --git a/test/test_statistics.py b/test/test_statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..d31f4e9919da27e679019b892d98557dee9a7f1d --- /dev/null +++ b/test/test_statistics.py @@ -0,0 +1,60 @@ +import pytest +import xarray as xr +import pandas as pd +import numpy as np +from src.statistics import standardise, standardise_inverse, centre, centre_inverse + + +@pytest.fixture(scope='module') +def input_data(): + return np.array([np.random.normal(2, 2, 2000), + np.random.normal(-5, 3, 2000), + np.random.normal(10, 1, 2000)]).T + + +@pytest.fixture(scope='module') +def pandas(input_data): + return pd.DataFrame(input_data) + + +@pytest.fixture(scope='module') +def xarray(input_data): + return xr.DataArray(input_data, dims=['index', 'value']) + + +class TestStandardise: + + @pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0), + (pytest.lazy_fixture('xarray'), 'index')]) + def test_standardise(self, data_org, dim): + mean, std, data = standardise(data_org, dim) + assert np.testing.assert_almost_equal(mean, [2, -5, 10], decimal=1) is None + assert np.testing.assert_almost_equal(std, [2, 3, 1], decimal=1) is None + assert np.testing.assert_almost_equal(data.mean(dim), [0, 0, 0]) is None + assert np.testing.assert_almost_equal(data.std(dim), [1, 1, 1]) is None + + @pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0), + (pytest.lazy_fixture('xarray'), 'index')]) + def test_standardise_inverse(self, data_org, dim): + mean, std, data = standardise(data_org, dim) + data_recovered = standardise_inverse(data, mean, std) + assert np.testing.assert_array_almost_equal(data_org, data_recovered) is None + + +class TestCentre: + + @pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0), + (pytest.lazy_fixture('xarray'), 'index')]) + def test_centre(self, data_org, dim): + mean, std, data = centre(data_org, dim) + assert np.testing.assert_almost_equal(mean, [2, -5, 10], decimal=1) is None + assert std is None + assert np.testing.assert_almost_equal(data.mean(dim), [0, 0, 0]) is None + + @pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0), + (pytest.lazy_fixture('xarray'), 'index')]) + def test_centre_inverse(self, data_org, dim): + mean, _, data = centre(data_org, dim) + data_recovered = centre_inverse(data, mean) + assert np.testing.assert_array_almost_equal(data_org, data_recovered) is None +