Skip to content
Snippets Groups Projects

WIP: Resolve "Implement Station Preparation"

1 file
+ 544
3
Compare changes
  • Side-by-side
  • Inline
@@ -7,11 +7,13 @@ import datetime as dt
import logging
import os
from functools import reduce
from typing import Union, List, Iterable, Tuple
from typing import Union, List, Iterable, Tuple, Dict
from src.helpers.join import EmptyQueryResult
import numpy as np
import pandas as pd
import xarray as xr
import dask.array as da
from src.configuration import check_path_and_create
from src import helpers
@@ -25,6 +27,537 @@ num_or_list = Union[number, List[number]]
data_or_none = Union[xr.DataArray, None]
class AbstractStationPrep():
def __init__(self): #, path, station, statistics_per_var, transformation, **kwargs):
pass
# passed parameters
# self.path = os.path.abspath(path)
# self.station = helpers.to_list(station)
# self.statistics_per_var = statistics_per_var
# # self.target_dim = 'variable'
# self.transformation = self.setup_transformation(transformation)
# self.kwargs = kwargs
#
# # internal
# self.data = None
# self.meta = None
# self.variables = kwargs.get('variables', list(statistics_per_var.keys()))
# self.history = None
# self.label = None
# self.observation = None
def get_X(self):
raise NotImplementedError
def get_Y(self):
raise NotImplementedError
# def load_data(self):
# try:
# self.read_data_from_disk()
# except FileNotFoundError:
# self.download_data()
# self.load_data()
#
# def read_data_from_disk(self):
# raise NotImplementedError
#
# def download_data(self):
# raise NotImplementedError
class StationPrep(AbstractStationPrep):
def __init__(self, path, station, statistics_per_var, transformation, station_type, network, sampling, target_dim, target_var,
interpolate_dim, window_history_size, window_lead_time, **kwargs):
super().__init__() # path, station, statistics_per_var, transformation, **kwargs)
self.station_type = station_type
self.network = network
self.sampling = sampling
self.target_dim = target_dim
self.target_var = target_var
self.interpolate_dim = interpolate_dim
self.window_history_size = window_history_size
self.window_lead_time = window_lead_time
self.path = os.path.abspath(path)
self.station = helpers.to_list(station)
self.statistics_per_var = statistics_per_var
# self.target_dim = 'variable'
self.transformation = self.setup_transformation(transformation)
self.kwargs = kwargs
# internal
self.data = None
self.meta = None
self.variables = kwargs.get('variables', list(statistics_per_var.keys()))
self.history = None
self.label = None
self.observation = None
self.make_samples()
def __repr__(self):
return f"StationPrep(path='{self.path}', station={self.station}, statistics_per_var={self.statistics_per_var}, " \
f"transformation={self.transformation}, station_type='{self.station_type}', network='{self.network}', " \
f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \
f"interpolate_dim='{self.interpolate_dim}', window_history_size={self.window_history_size}, " \
f"window_lead_time={self.window_lead_time}, **{self.kwargs})"
def get_transposed_history(self) -> xr.DataArray:
"""Return history.
:return: history with dimensions datetime, window, Stations, variables.
"""
return self.history.transpose("datetime", "window", "Stations", "variables").copy()
def get_transposed_label(self) -> xr.DataArray:
"""Return label.
:return: label with dimensions datetime*, window*, Stations, variables.
"""
return self.label.squeeze("Stations").transpose("datetime", "window").copy()
def get_X(self):
return self.get_transposed_history()
def get_Y(self):
return self.get_transposed_label()
def make_samples(self):
self.load_data()
self.make_history_window(self.target_dim, self.window_history_size, self.interpolate_dim)
self.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time)
self.make_observation(self.target_dim, self.target_var, self.interpolate_dim)
self.remove_nan(self.interpolate_dim)
def read_data_from_disk(self, source_name=""):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both
cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not
set, it is assumed, that data should be saved locally.
"""
source_name = source_name if len(source_name) == 0 else f" from {source_name}"
check_path_and_create(self.path)
file_name = self._set_file_name()
meta_file = self._set_meta_file_name()
if self.kwargs.get('overwrite_local_data', False):
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}")
if os.path.exists(file_name):
os.remove(file_name)
if os.path.exists(meta_file):
os.remove(meta_file)
data, self.meta = self.download_data(file_name, meta_file)
logging.debug(f"loaded new data{source_name}")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
self.meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta()
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data{source_name}")
data, self.meta = self.download_data(file_name, meta_file)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data)
self.data = self.check_for_negative_concentrations(data)
def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally
stored locally using given names for file and meta file.
:param file_name: name of file to save data to (containing full path)
:param meta_file: name of the meta data file (also containing full path)
:return: downloaded data and its meta data
"""
df_all = {}
df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var,
station_type=self.station_type, network_name=self.network, sampling=self.sampling)
df_all[self.station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()}
xarr = xr.Dataset(xarr).to_array(dim='Stations')
if self.kwargs.get('store_data_locally', True):
# save locally as nc/csv file
xarr.to_netcdf(path=file_name)
meta.to_csv(meta_file)
return xarr, meta
def download_data(self, file_name, meta_file):
data, meta = self.download_data_from_join(file_name, meta_file)
return data, meta
def check_station_meta(self):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if self.station_type is not None:
check_dict = {"station_type": self.station_type, "network_name": self.network}
for (k, v) in check_dict.items():
if v is None:
continue
if self.meta.at[k, self.station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
Set all negative concentrations to zero.
Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/
#2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox",
"o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene".
:param data: data array containing variables to check
:param minimum: minimum value, by default this should be 0
:return: corrected data
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
def shift(self, dim: str, window: int) -> xr.DataArray:
"""
Shift data multiple times to represent history (if window <= 0) or lead time (if window > 0).
:param dim: dimension along shift is applied
:param window: number of steps to shift (corresponds to the window length)
:return: shifted data
"""
start = 1
end = 1
if window <= 0:
start = window
else:
end = window + 1
res = []
for w in range(start, end):
res.append(self.data.shift({dim: -w}))
window_array = self.create_index_array('window', range(start, end), squeeze_dim=self.target_dim)
res = xr.concat(res, dim=window_array)
return res
@staticmethod
def create_index_array(index_name: str, index_value: Iterable[int], squeeze_dim: str) -> xr.DataArray:
"""
Create an 1D xr.DataArray with given index name and value.
:param index_name: name of dimension
:param index_value: values of this dimension
:return: this array
"""
ind = pd.DataFrame({'val': index_value}, index=index_value)
# res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim=squeez/e_dim, drop=True)
res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze(
dim=squeeze_dim,
drop=True
)
res.name = index_name
return res
def _set_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc")
def _set_meta_file_name(self):
all_vars = sorted(self.statistics_per_var.keys())
return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv")
def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True,
**kwargs):
"""
Interpolate values according to different methods.
(Copy paste from dataarray.interpolate_na)
:param dim:
Specifies the dimension along which to interpolate.
:param method:
{'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial', 'barycentric', 'krog', 'pchip',
'spline', 'akima'}, optional
String indicating which method to use for interpolation:
- 'linear': linear interpolation (Default). Additional keyword
arguments are passed to ``numpy.interp``
- 'nearest', 'zero', 'slinear', 'quadratic', 'cubic',
'polynomial': are passed to ``scipy.interpolate.interp1d``. If
method=='polynomial', the ``order`` keyword argument must also be
provided.
- 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their
respective``scipy.interpolate`` classes.
:param limit:
default None
Maximum number of consecutive NaNs to fill. Must be greater than 0
or None for no limit.
:param use_coordinate:
default True
Specifies which index to use as the x values in the interpolation
formulated as `y = f(x)`. If False, values are treated as if
eqaully-spaced along `dim`. If True, the IndexVariable `dim` is
used. If use_coordinate is a string, it specifies the name of a
coordinate variariable to use as the index.
:param kwargs:
:return: xarray.DataArray
"""
self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate,
**kwargs)
def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None:
"""
Create a xr.DataArray containing history data.
Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted
data. This is used to represent history in the data. Results are stored in history attribute.
:param dim_name_of_inputs: Name of dimension which contains the input variables
:param window: number of time steps to look back in history
Note: window will be treated as negative value. This should be in agreement with looking back on
a time line. Nonetheless positive values are allowed but they are converted to its negative
expression
:param dim_name_of_shift: Dimension along shift will be applied
"""
window = -abs(window)
self.history = self.shift(dim_name_of_shift, window).sel({dim_name_of_inputs: self.variables})
def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str,
window: int) -> None:
"""
Create a xr.DataArray containing labels.
Labels are defined as the consecutive target values (t+1, ...t+n) following the current time step t. Set label
attribute.
:param dim_name_of_target: Name of dimension which contains the target variable
:param target_var: Name of target variable in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
:param window: lead time of label
"""
window = abs(window)
self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var})
def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None:
"""
Create a xr.DataArray containing observations.
Observations are defined as value of the current time step t. Set observation attribute.
:param dim_name_of_target: Name of dimension which contains the observation variable
:param target_var: Name of observation variable(s) in 'dimension'
:param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied
"""
self.observation = self.shift(dim_name_of_shift, 0).sel({dim_name_of_target: target_var})
def remove_nan(self, dim: str) -> None:
"""
Remove all NAs slices along dim which contain nans in history, label and observation.
This is done to present only a full matrix to keras.fit. Update history, label, and observation attribute.
:param dim: dimension along the remove is performed.
"""
intersect = []
if (self.history is not None) and (self.label is not None):
non_nan_history = self.history.dropna(dim=dim)
non_nan_label = self.label.dropna(dim=dim)
non_nan_observation = self.observation.dropna(dim=dim)
intersect = reduce(np.intersect1d, (non_nan_history.coords[dim].values, non_nan_label.coords[dim].values,
non_nan_observation.coords[dim].values))
min_length = self.kwargs.get("min_length", 0)
if len(intersect) < max(min_length, 1):
self.history = None
self.label = None
self.observation = None
else:
self.history = self.history.sel({dim: intersect})
self.label = self.label.sel({dim: intersect})
self.observation = self.observation.sel({dim: intersect})
def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray:
"""
Set start and end date for slicing and execute self._slice().
:param data: data to slice
:param coord: name of axis to slice
:return: sliced data
"""
start = self.kwargs.get('start', data.coords[coord][0].values)
end = self.kwargs.get('end', data.coords[coord][-1].values)
return self._slice(data, start, end, coord)
@staticmethod
def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray:
"""
Slice through a given data_item (for example select only values of 2011).
:param data: data to slice
:param start: start date of slice
:param end: end date of slice
:param coord: name of axis to slice
:return: sliced data
"""
return data.loc[{coord: slice(str(start), str(end))}]
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
Set all negative concentrations to zero.
Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/
#2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox",
"o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene".
:param data: data array containing variables to check
:param minimum: minimum value, by default this should be 0
:return: corrected data
"""
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5",
"propane", "so2", "toluene"]
used_chem_vars = list(set(chem_vars) & set(self.variables))
data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum)
return data
def setup_transformation(self, transformation: Dict):
"""
Set up transformation by extracting all relevant information.
Extract all information from transformation dictionary. Possible keys are scope. method, mean, and std. Scope
can either be station or data. Station scope means, that data transformation is performed for each station
independently (somehow like batch normalisation), whereas data scope means a transformation applied on the
entire data set.
* If using data scope, mean and standard deviation (each only if required by transformation method) can either
be calculated accurate or as an estimate (faster implementation). This must be set in dictionary either
as "mean": "accurate" or "mean": "estimate". In both cases, the required statistics are calculated and saved.
After this calculations, the mean key is overwritten by the actual values to use.
* If using station scope, no additional information is required.
* If a transformation should be applied on base of existing values, these need to be provided in the respective
keys "mean" and "std" (again only if required for given method).
:param transformation: the transformation dictionary as described above.
:return: updated transformation dictionary
"""
if transformation is None:
return
transformation = transformation.copy()
scope = transformation.get("scope", "station")
method = transformation.get("method", "standardise")
mean = transformation.get("mean", None)
std = transformation.get("std", None)
if scope == "data":
if isinstance(mean, str):
if mean == "accurate":
mean, std = self.calculate_accurate_transformation(method)
elif mean == "estimate":
mean, std = self.calculate_estimated_transformation(method)
else:
raise ValueError(f"given mean attribute must either be equal to strings 'accurate' or 'estimate' or"
f"be an array with already calculated means. Given was: {mean}")
elif scope == "station":
mean, std = None, None
else:
raise ValueError(f"Scope argument can either be 'station' or 'data'. Given was: {scope}")
transformation["method"] = method
transformation["mean"] = mean
transformation["std"] = std
return transformation
def calculate_accurate_transformation(self, method: str) -> Tuple[data_or_none, data_or_none]:
"""
Calculate accurate transformation statistics.
Use all stations of this generator and calculate mean and standard deviation on entire data set using dask.
Because there can be much data, this can take a while.
:param method: name of transformation method
:return: accurate calculated mean and std (depending on transformation)
"""
tmp = []
mean = None
std = None
for station in self.stations:
try:
data = self.DataPrep(self.data_path, station, self.variables, station_type=self.station_type,
**self.kwargs)
chunks = (1, 100, data.data.shape[2])
tmp.append(da.from_array(data.data.data, chunks=chunks))
except EmptyQueryResult:
continue
tmp = da.concatenate(tmp, axis=1)
if method in ["standardise", "centre"]:
mean = da.nanmean(tmp, axis=1).compute()
mean = xr.DataArray(mean.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"])
if method == "standardise":
std = da.nanstd(tmp, axis=1).compute()
std = xr.DataArray(std.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"])
else:
raise NotImplementedError
return mean, std
def calculate_estimated_transformation(self, method):
"""
Calculate estimated transformation statistics.
Use all stations of this generator and calculate mean and standard deviation first for each station separately.
Afterwards, calculate the average mean and standard devation as estimated statistics. Because this method does
not consider the length of each data set, the estimated mean distinguishes from the real data mean. Furthermore,
the estimated standard deviation is assumed to be the mean (also not weighted) of all deviations. But this is
mathematically not true, but still a rough and faster estimation of the true standard deviation. Do not use this
method for further statistical calculation. However, in the scope of data preparation for machine learning, this
approach is decent ("it is just scaling").
:param method: name of transformation method
:return: accurate calculated mean and std (depending on transformation)
"""
data = [[]] * len(self.variables)
coords = {"variables": self.variables, "Stations": range(0)}
mean = xr.DataArray(data, coords=coords, dims=["variables", "Stations"])
std = xr.DataArray(data, coords=coords, dims=["variables", "Stations"])
for station in self.stations:
try:
data = self.DataPrep(self.data_path, station, self.variables, station_type=self.station_type,
**self.kwargs)
data.transform("datetime", method=method)
mean = mean.combine_first(data.mean)
std = std.combine_first(data.std)
data.transform("datetime", method=method, inverse=True)
except EmptyQueryResult:
continue
return mean.mean("Stations") if mean.shape[1] > 0 else None, std.mean("Stations") if std.shape[1] > 0 else None
def load_data(self):
try:
self.read_data_from_disk()
except FileNotFoundError:
self.download_data()
self.load_data()
class AbstractDataPrep(object):
"""
This class prepares data to be used in neural networks.
@@ -554,5 +1087,13 @@ class AbstractDataPrep(object):
if __name__ == "__main__":
dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
print(dp)
# dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'})
# print(dp)
statistics_per_var = {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'}
sp = StationPrep(path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122',
statistics_per_var=statistics_per_var, transformation={}, station_type='background',
network='UBA', sampling='daily', target_dim='variables', target_var='o3',
interpolate_dim='datetime', window_history_size=7, window_lead_time=3)
sp.get_X()
sp.get_Y()
print(sp)
Loading