Skip to content
Snippets Groups Projects
Commit 608d219d authored by lukas leufen's avatar lukas leufen
Browse files

implemented transform and some tests

parent c91b77fe
Branches
Tags
2 merge requests!6updated inception model and data prep class,!4data prep class
......@@ -7,23 +7,17 @@ import pandas as pd
import logging
import os
from src import join, helpers
from typing import Union, List
from src import statistics
from typing import Union, List, Dict
class DataPrep:
class DataPrep(object):
def __init__(self, path: str, network: str, station: Union[str, List[str]], variables, **kwargs):
self.path = path
def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs):
self.path = os.path.abspath(path)
self.network = network
self.station = helpers.to_list(station)
self.variables = variables
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.load_data()
else:
raise NotImplementedError
# self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station,
# self.variables, **kwargs)
self.mean = None
self.std = None
self.df = None
......@@ -32,6 +26,14 @@ class DataPrep:
self.kwargs = kwargs
self.data = None
self.meta = None
self._transform_method = None
self.statistics_per_var = kwargs.get("statistics_per_var", None)
if self.statistics_per_var is not None:
self.load_data()
else:
raise NotImplementedError
# self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station,
# self.variables, **kwargs)
def load_data(self):
self.check_path_and_create()
......@@ -54,30 +56,112 @@ class DataPrep:
self.meta.to_csv(meta_file)
def _set_file_name(self):
return f"{self.path}{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc"
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc")
def _set_meta_file_name(self):
return f"{self.path}{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv"
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv")
def __repr__(self):
return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \
f"variables={self.variables}, **{self.kwargs}"
f"variables={self.variables}, **{self.kwargs})"
def check_path_and_create(self):
try:
os.makedirs(self.path)
logging.info("Created path: {}".format(self.path))
logging.info(f"Created path: {self.path}")
except FileExistsError:
logging.info(f"Path already exists: {self.path}")
pass
def interpolate(self, dim=None, method='linear', limit=None, use_coordinate=True, **kwargs):
raise NotImplementedError
def interpolate(self, dim: str = None, method: str = 'linear', limit: int = None,
use_coordinate: Union[bool, str] = True, **kwargs):
"""
(Copy paste from dataarray.interpolate_na)
Interpolate values according to different methods.
:param dim:
Specifies the dimension along which to interpolate.
:param method:
{'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial', 'barycentric', 'krog', 'pchip',
'spline', 'akima'}, optional
String indicating which method to use for interpolation:
- 'linear': linear interpolation (Default). Additional keyword
arguments are passed to ``numpy.interp``
- 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial': are passed to ``scipy.interpolate.interp1d``. If
method=='polynomial', the ``order`` keyword argument must also be
provided.
- 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their
respective``scipy.interpolate`` classes.
:param limit:
default None
Maximum number of consecutive NaNs to fill. Must be greater than 0
or None for no limit.
:param use_coordinate:
default True
Specifies which index to use as the x values in the interpolation
formulated as `y = f(x)`. If False, values are treated as if
eqaully-spaced along `dim`. If True, the IndexVariable `dim` is
used. If use_coordinate is a string, it specifies the name of a
coordinate variariable to use as the index.
:param kwargs:
:return: xarray.DataArray
"""
self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate,
**kwargs)
def restandardise(self, data, dim='variables', **kwargs):
raise NotImplementedError
def standardise(self, dim):
raise NotImplementedError
"""
:param data:
:param dim:
:param kwargs:
:return:
"""
variables = kwargs.get('variables', None)
if variables is None:
return FKf.restandardize(data, mean=self.mean, std=self.std, stand=True)
else:
return FKf.restandardize(data,
mean=self.mean.sel({dim: variables}).values,
std=self.std.sel({dim: variables}).values,
stand=True)
def transform(self, dim: Union[str, int] = 0, method: str = 'standardise') -> None:
"""
This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0
and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale
(`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation
:param string/int dim:
| for xarray.DataArray as string: name of dimension which should be standardised
| for pandas.DataFrame as int: axis of dimension which should be standardised
:param method:
:return: xarray.DataArrays or pandas.DataFrames:
#. mean: Mean of data
#. std: Standard deviation of data
#. data: Standardised data
"""
def f(data):
if method == 'standardise':
return statistics.standardise(data, dim)
elif method == 'centre':
return statistics.centre(data, dim)
elif method == 'normalise':
# use min/max of data or given min/max
raise NotImplementedError
else:
raise NotImplementedError
if self._transform_method is not None:
raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with "
f"{self._transform_method}. Please perform inverse transformation of data first.")
self._transform_method = method
self.mean, self.std, self.data = f(self.data)
def make_history_window(self, dim, window):
raise NotImplementedError
......@@ -95,6 +179,13 @@ class DataPrep:
def create_indexarray(index_name, index_values):
raise NotImplementedError
def _slice_prep(self, data, coord='datetime'):
raise NotImplementedError
@staticmethod
def _slice(data, start, end, coord):
raise NotImplementedError
if __name__ == "__main__":
......
import pytest
import os
from src.data_preparation import DataPrep
import logging
import numpy as np
import xarray as xr
class TestDataPrep:
@pytest.fixture
def data(self):
return DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], test='testKWARGS',
statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
def test_init(self, data):
assert data.path == os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
assert data.network == 'dummy'
assert data.station == ['DEBW107']
assert data.variables == ['o3', 'temp']
assert data.statistics_per_var == {'o3': 'dma8eu', 'temp': 'maximum'}
assert not all([data.mean, data.std, data.df, data.history, data.label])
assert {'test': 'testKWARGS'}.items() <= data.kwargs.items()
def test_init_no_stats(self):
with pytest.raises(NotImplementedError):
DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'])
def test_check_path_and_create(self, caplog):
caplog.set_level(logging.INFO)
d = object.__new__(DataPrep)
d.path = 'data/test'
assert not os.path.exists('data/test')
d.check_path_and_create()
assert os.path.exists('data/test')
assert caplog.messages[0] == "Created path: data/test"
d.check_path_and_create()
assert caplog.messages[1] == "Path already exists: data/test"
os.rmdir('data/test')
def test_repr(self):
d = object.__new__(DataPrep)
d.path = 'data/test'
d.network = 'dummy'
d.station = ['DEBW107']
d.variables = ['o3', 'temp']
d.kwargs = None
assert d.__repr__().rstrip() == "Dataprep(path='data/test', network='dummy', station=['DEBW107'], "\
"variables=['o3', 'temp'], **None)".rstrip()
def test_set_file_name_and_meta(self):
d = object.__new__(DataPrep)
d.path = os.path.abspath('data/test')
d.station = 'TESTSTATION'
d.variables = ['a', 'bc']
assert d._set_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc.nc")
assert d._set_meta_file_name() == os.path.join(os.path.abspath(os.path.dirname(__file__)),
"data/test/TESTSTATION_a_bc_meta.csv")
def test_interpolate(self):
pass
def test_transform_standardise(self, data):
assert data._transform_method is None
assert data.mean is None
assert data.std is None
data.transform('datetime')
assert data._transform_method == 'standardise'
assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None
assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, np.array([[1, 1]])) is None
assert isinstance(data.mean, xr.DataArray)
assert isinstance(data.std, xr.DataArray)
def test_transform_centre(self, data):
assert data._transform_method is None
assert data.mean is None
assert data.std is None
data_std_org = data.data.std('datetime'). variable.values
data.transform('datetime', 'centre')
assert data._transform_method == 'centre'
assert np.testing.assert_almost_equal(data.data.mean('datetime').variable.values, np.array([[0, 0]])) is None
assert np.testing.assert_almost_equal(data.data.std('datetime').variable.values, data_std_org) is None
assert data.std is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment