Skip to content
Snippets Groups Projects
Commit 2ed36ebf authored by lukas leufen's avatar lukas leufen
Browse files

Merge branch 'lukas_issue002_feat_data-prep-class' into 'develop'

data prep class

See merge request toar/machinelearningtools!4
parents 088d0259 9d10416a
No related branches found
No related tags found
2 merge requests!6updated inception model and data prep class,!4data prep class
# .coveragerc to control coverage.py
[run]
branch = True
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
# Don't complain about import statements
import
......@@ -40,10 +40,18 @@ Thumbs.db
.idea/
/venv/
# check plot folder #
#####################
# don't check data and plot folder #
####################################
/data/
/plots/
# tmp folder #
##############
/tmp/
# test related data #
#####################
.coverage
htmlcov/
.pytest_cache
/test/data/
Keras==2.2.4
numpy==1.15.4
tensorflow==1.12.0
xarray==0.14.0
pandas==0.25.1
requests==2.22.0
pytest==5.2.1
pytest-lazy-fixture==0.6.1
pytest-cov
pydot
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import xarray as xr
import pandas as pd
import numpy as np
import logging
import os
from src import join, helpers
from src import statistics
from typing import Union, List, Iterable
import datetime as dt
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
class DataPrep(object):
"""
This class prepares data to be used in neural networks. The instance searches for local stored data, that meet the
given demands. If no local data is found, the DataPrep instance will load data from TOAR database and store this
data locally to use the next time. For the moment, there is only support for daily aggregated time series. The
aggregation can be set manually and differ for each variable.
After data loading, different data pre-processing steps can be executed to prepare the data for further
applications. Especially the following methods can be used for the pre-processing step:
- interpolate: interpolate between data points by using xarray's interpolation method
- standardise: standardise data to mean=1 and std=1, centralise to mean=0, additional methods like normalise on
interval [0, 1] are not implemented yet.
- make window history: represent the history (time steps before) for training/ testing; X
- make labels: create target vector with given leading time steps for training/ testing; y
- remove Nans jointly from desired input and output, only keeps time steps where no NaNs are present in X AND y. Use
this method after the creation of the window history and labels to clean up the data cube.
To create a DataPrep instance, it is needed to specify the stations by id (e.g. "DEBW107"), its network (e.g. UBA,
"Umweltbundesamt") and the variables to use. Further options can be set in the instance.
* `statistics_per_var`: define a specific statistic to extract from the TOAR database for each variable.
* `start`: define a start date for the data cube creation. Default: Use the first entry in time series
* `end`: set the end date for the data cube. Default: Use last date in time series.
* `store_data_locally`: store recently downloaded data on local disk. Default: True
* set further parameters for xarray's interpolation methods to modify the interpolation scheme
"""
def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs):
self.path = os.path.abspath(path)
self.network = network
self.station = helpers.to_list(station)
self.variables = variables
self.mean = None
self.std = None
self.history = None
self.label = None
self.kwargs = kwargs
self.data = None
self.meta = None
self._transform_method = None
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.load_data()
else:
raise NotImplementedError # hourly data usage is not implemented yet
# self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station,
# self.variables, **kwargs)
def load_data(self):
"""
Load data and meta data either from local disk (preferred) or download new data from TOAR database if no local
data is available. The latter case, store downloaded data locally if wished (default yes).
"""
self.check_path_and_create()
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
try:
data = self._slice_prep(xr.open_dataarray(file_name))
self.data = self.check_for_negative_concentrations(data)
self.meta = pd.read_csv(meta_file, index_col=0)
except FileNotFoundError as e:
logging.warning(e)
data, self.meta = self.download_data_from_join(file_name, meta_file)
data = self._slice_prep(data)
self.data = self.check_for_negative_concentrations(data)
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
:param file_name:
:param meta_file:
:return:
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var)
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
if self.kwargs.get('store_data_locally', True):
# save locally as nc/csv file
xarr.to_netcdf(path=file_name)
meta.to_csv(meta_file)
return xarr, meta
def _set_file_name(self):
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc")
def _set_meta_file_name(self):
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv")
def __repr__(self):
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
f"variables={self.variables}, **{self.kwargs})"
def check_path_and_create(self):
try:
os.makedirs(self.path)
logging.info(f"Created path: {self.path}")
except FileExistsError:
logging.info(f"Path already exists: {self.path}")
pass
def interpolate(self, dim: str, method: str = 'linear', limit: int = None,
use_coordinate: Union[bool, str] = True, **kwargs):
"""
(Copy paste from dataarray.interpolate_na)
Interpolate values according to different methods.
:param dim:
Specifies the dimension along which to interpolate.
:param method:
{'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial', 'barycentric', 'krog', 'pchip',
'spline', 'akima'}, optional
String indicating which method to use for interpolation:
- 'linear': linear interpolation (Default). Additional keyword
arguments are passed to ``numpy.interp``
- 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial': are passed to ``scipy.interpolate.interp1d``. If
method=='polynomial', the ``order`` keyword argument must also be
provided.
- 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their
respective``scipy.interpolate`` classes.
:param limit:
default None
Maximum number of consecutive NaNs to fill. Must be greater than 0
or None for no limit.
:param use_coordinate:
default True
Specifies which index to use as the x values in the interpolation
formulated as `y = f(x)`. If False, values are treated as if
eqaully-spaced along `dim`. If True, the IndexVariable `dim` is
used. If use_coordinate is a string, it specifies the name of a
coordinate variariable to use as the index.
:param kwargs:
:return: xarray.DataArray
"""
self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate,
**kwargs)
@staticmethod
def check_inverse_transform_params(mean, std, method) -> None:
msg = ""
if method in ['standardise', 'centre'] and mean is None:
msg += "mean, "
if method == 'standardise' and std is None:
msg += "std, "
if len(msg) > 0:
raise AttributeError(f"Inverse transform {method} can not be executed because following is None: {msg}")
def inverse_transform(self) -> None:
"""
Perform inverse transformation
:return:
"""
def f_inverse(data, mean, std, method_inverse):
if method_inverse == 'standardise':
return statistics.standardise_inverse(data, mean, std), None, None
elif method_inverse == 'centre':
return statistics.centre_inverse(data, mean), None, None
elif method_inverse == 'normalise':
raise NotImplementedError
else:
raise NotImplementedError
if self._transform_method is None:
raise AssertionError("Inverse transformation method is not set. Data cannot be inverse transformed.")
self.check_inverse_transform_params(self.mean, self.std, self._transform_method)
self.data, self.mean, self.std = f_inverse(self.data, self.mean, self.std, self._transform_method)
self._transform_method = None
def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False) -> None:
"""
This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0
and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale
(`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This
method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the
internal transform method, internal mean and internal standard deviation weren't set ('inverse=True').
:param string/int dim: This param is not used for inverse transformation.
| for xarray.DataArray as string: name of dimension which should be standardised
| for pandas.DataFrame as int: axis of dimension which should be standardised
:param method: Choose the transformation method from 'standardise' and 'centre'. 'normalise' is not implemented
yet. This param is not used for inverse transformation.
:param inverse: Switch between transformation and inverse transformation.
:return: xarray.DataArrays or pandas.DataFrames:
#. mean: Mean of data
#. std: Standard deviation of data
#. data: Standardised data
"""
def f(data):
if method == 'standardise':
return statistics.standardise(data, dim)
elif method == 'centre':
return statistics.centre(data, dim)
elif method == 'normalise':
# use min/max of data or given min/max
raise NotImplementedError
else:
raise NotImplementedError
if not inverse:
if self._transform_method is not None:
raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with "
f"{self._transform_method}. Please perform inverse transformation of data first.")
self.mean, self.std, self.data = f(self.data)
self._transform_method = method
else:
self.inverse_transform()
def make_history_window(self, dim: str, window: int) -> None:
"""
This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window'
containing the shifted data. This is used to represent history in the data. Results are stored in self.history .
:param dim: Dimension along shift will be applied
:param window: number of time steps to look back in history
Note: window will be treated as negative value. This should be in agreement with looking back on
a time line. Nonetheless positive values are allowed but they are converted to its negative
expression
"""
window = -abs(window)
self.history = self.shift(dim, window)
def shift(self, dim: str, window: int) -> xr.DataArray:
"""
This function uses xarray's shift function multiple times to represent history (if window <= 0)
or lead time (if window > 0)
:param dim: dimension along shift is applied
:param window: number of steps to shift (corresponds to the window length)
:return:
"""
start = 1
end = 1
if window <= 0:
start = window
else:
end = window + 1
res = []
for w in range(start, end):
res.append(self.data.shift({dim: -w}))
window_array = self.create_index_array('window', range(start, end))
res = xr.concat(res, dim=window_array)
return res
def make_labels(self, dim_name_of_target: str, target_var: str, dim_name_of_shift: str, window: int) -> None:
"""
This function creates a xarray.DataArray containing labels
:param dim_name_of_target: Name of dimension which contains the target variable
:param target_var: Name of target variable in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
:param window: lead time of label
"""
window = abs(window)
self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var})
def history_label_nan_remove(self, dim: str) -> None:
"""
All NAs slices in dim which contain nans in self.history or self.label are removed in both data sets.
This is done to present only a full matrix to keras.fit.
:param dim:
:return:
"""
intersect = []
if (self.history is not None) and (self.label is not None):
non_nan_history = self.history.dropna(dim=dim)
non_nan_label = self.label.dropna(dim=dim)
intersect = np.intersect1d(non_nan_history.coords[dim].values,
non_nan_label.coords[dim].values)
if len(intersect) == 0:
self.history = None
self.label = None
else:
self.history = self.history.sel({dim: intersect})
self.label = self.label.sel({dim: intersect})
@staticmethod
def create_index_array(index_name: str, index_value: Iterable[int]) -> xr.DataArray:
"""
This Function crates a 1D xarray.DataArray with given index name and value
:param index_name:
:param index_value:
:return:
"""
ind = pd.DataFrame({'val': index_value}, index=index_value)
res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim='variable', drop=True)
res.name = index_name
return res
def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray:
"""
This function prepares all settings for slicing and executes _slice
:param data:
:param coord: name of axis to slice
:return:
"""
start = self.kwargs.get('start', data.coords[coord][0].values)
end = self.kwargs.get('end', data.coords[coord][-1].values)
return self._slice(data, start, end, coord)
@staticmethod
def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray:
"""
This function slices through a given data_item (for example select only values of 2011)
:param data:
:param start:
:param end:
:param coord: name of axis to slice
:return:
"""
return data.loc[{coord: slice(start, end)}]
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
This function sets all negative concentrations to zero. Names of all concentrations are extracted from
https://join.fz-juelich.de/services/rest/surfacedata/ #2.1 Parameters
:param data:
:param minimum:
:return:
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
if __name__ == "__main__":
dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)
__author__ = 'Lukas Leufen'
__date__ = '2019-10-21'
def to_list(arg):
if not isinstance(arg, list):
arg = [arg]
return arg
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import requests
import json
import logging
import pandas as pd
import datetime as dt
from typing import Iterator, Union, List
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
logging.basicConfig(level=logging.INFO)
def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]:
"""
read data from JOIN/TOAR
:param station_name: Station name e.g. DEBY122
:param statvar: key as variable like 'O3', values as statistics on keys like 'mean'
:returns:
- df - pandas df with all variables and statistics
- meta - pandas df with all meta information
"""
# make sure station_name parameter is a list
if not isinstance(station_name, list):
station_name = [station_name]
# load series information
opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]}
url = create_url(**opts)
response = requests.get(url)
station_vars = response.json()
vars_dict = {item[3].lower(): item[0] for item in station_vars}
# download all variables with given statistic
data = None
df = None
for var in _lower_list(sorted(vars_dict.keys())):
if var in statvar.keys():
logging.info('load: {}'.format(var))
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var],
'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
url = create_url(**opts)
# load data
response = requests.get(url)
data = response.json()
# correct namespace of statistics
stat = _correct_stat_name(statvar[var])
# store data in pandas dataframe
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
meta.columns = station_name
return df, meta
else:
raise ValueError("No data found in JOIN.")
def _correct_stat_name(stat: str) -> str:
"""
Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping
namespace.
:param stat: namespace from JOIN server
:return: stat mapped to local namespace
"""
mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
return mapping.get(stat, stat)
def _lower_list(args: List[str]) -> Iterator[str]:
"""
lower all elements of given list
:param args: list with string entries to lower
:return: iterator that lowers all list entries
"""
for string in args:
yield string.lower()
def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str:
"""
create a request url with given base url, service type and arbitrarily many additional keyword arguments
:param base: basic url of the rest service
:param service: service type, e.g. series, stats
:param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
:return: combined url as string
"""
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items())
return url
if __name__ == "__main__":
var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
station = 'DEBW107'
download_join(station, var_all_dic)
__author__ = 'Lukas Leufen'
__date__ = '2019-10-23'
import xarray as xr
import pandas as pd
from typing import Union, Tuple
Data = Union[xr.DataArray, pd.DataFrame]
def standardise(data: Data, dim: Union[str, int]) -> Tuple[Data, Data, Data]:
"""
This function standardises a xarray.dataarray (along dim) or pandas.DataFrame (along axis) with mean=0 and std=1
:param data:
:param string/int dim:
| for xarray.DataArray as string: name of dimension which should be standardised
| for pandas.DataFrame as int: axis of dimension which should be standardised
:return: xarray.DataArrays or pandas.DataFrames:
#. mean: Mean of data
#. std: Standard deviation of data
#. data: Standardised data
"""
return data.mean(dim), data.std(dim), (data - data.mean(dim)) / data.std(dim)
def standardise_inverse(data: Data, mean: Data, std: Data) -> Data:
"""
This is the inverse function of `standardise` and therefore vanishes the standardising.
:param data:
:param mean:
:param std:
:return:
"""
return data * std + mean
def centre(data: Data, dim: Union[str, int]) -> Tuple[Data, None, Data]:
"""
This function centres a xarray.dataarray (along dim) or pandas.DataFrame (along axis) to mean=0
:param data:
:param string/int dim:
| for xarray.DataArray as string: name of dimension which should be standardised
| for pandas.DataFrame as int: axis of dimension which should be standardised
:return: xarray.DataArrays or pandas.DataFrames:
#. mean: Mean of data
#. std: Standard deviation of data
#. data: Standardised data
"""
return data.mean(dim), None, data - data.mean(dim)
def centre_inverse(data: Data, mean: Data) -> Data:
"""
This function is the inverse function of `centre` and therefore adds the given values of mean to the data.
:param data:
:param mean:
:return:
"""
return data + mean
import pytest
import os
from src.data_preparation import DataPrep
import logging
import numpy as np
import xarray as xr
import datetime as dt
import pandas as pd
from operator import itemgetter
class TestDataPrep:
@pytest.fixture
def data(self):
return DataPrep('test/data/', 'dummy', 'DEBW107', ['o3', 'temp'], test='testKWARGS',
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
def test_init(self, data):
assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
assert data.network == 'dummy'
assert data.station == ['DEBW107']
assert data.variables == ['o3', 'temp']
assert data.statistics_per_var == {'o3': 'dma8eu', 'temp': 'maximum'}
assert not all([data.mean, data.std, data.history, data.label])
assert {'test': 'testKWARGS'}.items() <= data.kwargs.items()
def test_init_no_stats(self):
with pytest.raises(NotImplementedError):
DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'])
def test_check_path_and_create(self, caplog):
caplog.set_level(logging.INFO)
d = object.__new__(DataPrep)
d.path = 'data/test'
assert not os.path.exists('data/test')
d.check_path_and_create()
assert os.path.exists('data/test')
assert caplog.messages[0] == "Created path: data/test"
d.check_path_and_create()
assert caplog.messages[1] == "Path already exists: data/test"
os.rmdir('data/test')
def test_repr(self):
d = object.__new__(DataPrep)
d.path = 'data/test'
d.network = 'dummy'
d.station = ['DEBW107']
d.variables = ['o3', 'temp']
d.kwargs = None
assert d.__repr__().rstrip() == "Dataprep(path='data/test', network='dummy', station=['DEBW107'], "\
"variables=['o3', 'temp'], **None)".rstrip()
def test_set_file_name_and_meta(self):
d = object.__new__(DataPrep)
d.path = os.path.abspath('test/data/test')
d.station = 'TESTSTATION'
d.variables = ['a', 'bc']
assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc.nc")
assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc_meta.csv")
@pytest.mark.parametrize('opts', [{'dim': 'datetime', 'method': 'nearest', 'limit': 10, 'use_coordinate': True},
{'dim': 'datetime', 'limit': 5}, {'dim': 'datetime'}])
def test_interpolate(self, data, opts):
data_org = data.data
data.interpolate(**opts)
# set default params if empty
opts["method"] = opts.get("method", 'linear')
opts["limit"] = opts.get("limit", None)
opts["use_coordinate"] = opts.get("use_coordinate", True)
assert xr.testing.assert_equal(data_org.interpolate_na(**opts), data.data) is None
def test_transform_standardise(self, data):
assert data._transform_method is None
assert data.mean is None
assert data.std is None
data.transform('datetime')
assert data._transform_method == 'standardise'
assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None
assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, np.array([[1, 1]])) is None
assert isinstance(data.mean, xr.DataArray)
assert isinstance(data.std, xr.DataArray)
@pytest.mark.parametrize('mean, std, method, msg', [(10, 3, 'standardise', ''), (6, None, 'standardise', 'std, '),
(None, 3, 'standardise', 'mean, '), (19, None, 'centre', ''),
(None, 2, 'centre', 'mean, '), (8, 2, 'centre', ''),
(None, None, 'standardise', 'mean, std, ')])
def test_check_inverse_transform_params(self, data, mean, std, method, msg):
if len(msg) > 0:
with pytest.raises(AttributeError) as e:
data.check_inverse_transform_params(mean, std, method)
assert msg in e.value.args[0]
else:
assert data.check_inverse_transform_params(mean, std, method) is None
def test_transform_centre(self, data):
assert data._transform_method is None
assert data.mean is None
assert data.std is None
data_std_org = data.data.std('datetime'). variable.values
data.transform('datetime', 'centre')
assert data._transform_method == 'centre'
assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None
assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, data_std_org) is None
assert data.std is None
@pytest.mark.parametrize('method', ['standardise', 'centre'])
def test_transform_inverse(self, data, method):
data_org = data.data
data.transform('datetime', method)
data.inverse_transform()
assert data._transform_method is None
assert data.mean is None
assert data.std is None
assert np.testing.assert_array_almost_equal(data_org, data.data) is None
data.transform('datetime', method)
data.transform('datetime', inverse=True)
assert data._transform_method is None
assert data.mean is None
assert data.std is None
assert np.testing.assert_array_almost_equal(data_org, data.data) is None
@pytest.mark.parametrize('method', ['normalise', 'unknownmethod'])
def test_transform_errors(self, data, method):
with pytest.raises(NotImplementedError):
data.transform('datetime', method)
data._transform_method = method
with pytest.raises(AssertionError) as e:
data.transform('datetime', method)
assert "Transform method is already set." in e.value.args[0]
@pytest.mark.parametrize('method', ['normalise', 'unknownmethod'])
def test_transform_inverse_errors(self, data, method):
with pytest.raises(AssertionError) as e:
data.inverse_transform()
assert "Inverse transformation method is not set." in e.value.args[0]
data.mean = 1
data.std = 1
data._transform_method = method
with pytest.raises(NotImplementedError):
data.inverse_transform()
def test_nan_remove_no_hist_or_label(self, data):
assert data.history is None
assert data.label is None
data.history_label_nan_remove('datetime')
assert data.history is None
assert data.label is None
data.make_history_window('datetime', 6)
assert data.history is not None
data.history_label_nan_remove('datetime')
assert data.history is None
data.make_labels('variables', 'o3', 'datetime', 2)
assert data.label is not None
data.history_label_nan_remove('datetime')
assert data.label is None
def test_nan_remove(self, data):
data.make_history_window('datetime', -12)
data.make_labels('variables', 'o3', 'datetime', 3)
shape = data.history.shape
data.history_label_nan_remove('datetime')
assert data.history.isnull().sum() == 0
assert itemgetter(0, 1, 3)(shape) == itemgetter(0, 1, 3)(data.history.shape)
assert shape[2] >= data.history.shape[2]
def test_create_index_array(self, data):
index_array = data.create_index_array('window', range(1, 4))
assert np.testing.assert_array_equal(index_array.data, [1, 2, 3]) is None
assert index_array.name == 'window'
assert index_array.coords.dims == ('window', )
index_array = data.create_index_array('window', range(0, 1))
assert np.testing.assert_array_equal(index_array.data, [0]) is None
assert index_array.name == 'window'
assert index_array.coords.dims == ('window', )
@staticmethod
def extract_window_data(res, orig, w):
slice = {'variables': ['temp'], 'Stations': 'DEBW107', 'datetime': dt.datetime(1997, 1, 6)}
window = res.sel(slice).data.flatten()
if w <= 0:
delta = w
w = abs(w)+1
else:
delta = 1
slice = {'variables': ['temp'], 'Stations': 'DEBW107',
'datetime': pd.date_range(dt.date(1997, 1, 6) + dt.timedelta(days=delta), periods=w, freq='D')}
orig_slice = orig.sel(slice).data.flatten()
return window, orig_slice
def test_shift(self, data):
res = data.shift('datetime', 4)
window, orig = self.extract_window_data(res, data.data, 4)
assert res.coords.dims == ('window', 'Stations', 'datetime', 'variables')
assert list(res.data.shape) == [4] + list(data.data.shape)
assert np.testing.assert_array_equal(orig, window) is None
res = data.shift('datetime', -3)
window, orig = self.extract_window_data(res, data.data, -3)
assert list(res.data.shape) == [4] + list(data.data.shape)
assert np.testing.assert_array_equal(orig, window) is None
res = data.shift('datetime', 0)
window, orig = self.extract_window_data(res, data.data, 0)
assert list(res.data.shape) == [1] + list(data.data.shape)
assert np.testing.assert_array_equal(orig, window) is None
def test_make_history_window(self, data):
assert data.history is None
data.make_history_window('datetime', 5)
assert data.history is not None
save_history = data.history
data.make_history_window('datetime', -5)
assert np.testing.assert_array_equal(data.history, save_history) is None
def test_make_labels(self, data):
assert data.label is None
data.make_labels('variables', 'o3', 'datetime', 3)
assert data.label.variables.data == 'o3'
assert list(data.label.shape) == [3] + list(data.data.shape)[:2]
save_label = data.label
data.make_labels('variables', 'o3', 'datetime', -3)
assert np.testing.assert_array_equal(data.label, save_label) is None
def test_slice(self, data):
res = data._slice(data.data, dt.date(1997, 1, 1), dt.date(1997, 1, 10), 'datetime')
assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data.data.shape)
assert res.shape[1] == 10
def test_slice_prep(self, data):
res = data._slice_prep(data.data)
assert res.shape == data.data.shape
data.kwargs['start'] = res.coords['datetime'][0].values
data.kwargs['end'] = res.coords['datetime'][9].values
res = data._slice_prep(data.data)
assert itemgetter(0, 2)(res.shape) == itemgetter(0, 2)(data.data.shape)
assert res.shape[1] == 10
def test_check_for_neg_concentrations(self, data):
res = data.check_for_negative_concentrations(data.data)
assert res.sel({'variables': 'o3'}).min() >= 0
res = data.check_for_negative_concentrations(data.data, minimum=2)
assert res.sel({'variables': 'o3'}).min() >= 2
import pytest
import xarray as xr
import pandas as pd
import numpy as np
from src.statistics import standardise, standardise_inverse, centre, centre_inverse
@pytest.fixture(scope='module')
def input_data():
return np.array([np.random.normal(2, 2, 2000),
np.random.normal(-5, 3, 2000),
np.random.normal(10, 1, 2000)]).T
@pytest.fixture(scope='module')
def pandas(input_data):
return pd.DataFrame(input_data)
@pytest.fixture(scope='module')
def xarray(input_data):
return xr.DataArray(input_data, dims=['index', 'value'])
class TestStandardise:
@pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0),
(pytest.lazy_fixture('xarray'), 'index')])
def test_standardise(self, data_org, dim):
mean, std, data = standardise(data_org, dim)
assert np.testing.assert_almost_equal(mean, [2, -5, 10], decimal=1) is None
assert np.testing.assert_almost_equal(std, [2, 3, 1], decimal=1) is None
assert np.testing.assert_almost_equal(data.mean(dim), [0, 0, 0]) is None
assert np.testing.assert_almost_equal(data.std(dim), [1, 1, 1]) is None
@pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0),
(pytest.lazy_fixture('xarray'), 'index')])
def test_standardise_inverse(self, data_org, dim):
mean, std, data = standardise(data_org, dim)
data_recovered = standardise_inverse(data, mean, std)
assert np.testing.assert_array_almost_equal(data_org, data_recovered) is None
class TestCentre:
@pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0),
(pytest.lazy_fixture('xarray'), 'index')])
def test_centre(self, data_org, dim):
mean, std, data = centre(data_org, dim)
assert np.testing.assert_almost_equal(mean, [2, -5, 10], decimal=1) is None
assert std is None
assert np.testing.assert_almost_equal(data.mean(dim), [0, 0, 0]) is None
@pytest.mark.parametrize('data_org, dim', [(pytest.lazy_fixture('pandas'), 0),
(pytest.lazy_fixture('xarray'), 'index')])
def test_centre_inverse(self, data_org, dim):
mean, _, data = centre(data_org, dim)
data_recovered = centre_inverse(data, mean)
assert np.testing.assert_array_almost_equal(data_org, data_recovered) is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment