diff --git a/src/data_handling/advanced_data_handling.py b/src/data_handling/advanced_data_handling.py new file mode 100644 index 0000000000000000000000000000000000000000..e36e0c75fc9107431a69482d46755acbdf5334bd --- /dev/null +++ b/src/data_handling/advanced_data_handling.py @@ -0,0 +1,263 @@ + +__author__ = 'Lukas Leufen' +__date__ = '2020-07-08' + + +from src.helpers import to_list, remove_items +import numpy as np +import xarray as xr +import pickle +import os +import pandas as pd +import datetime as dt +import shutil + +from typing import Union, List, Tuple +import logging +from functools import reduce + +number = Union[float, int] +num_or_list = Union[number, List[number]] + + +class DummyDataSingleStation: # pragma: no cover + + def __init__(self, name, number_of_samples=None): + self.name = name + self.number_of_samples = number_of_samples if number_of_samples is not None else np.random.randint(100, 150) + + def get_X(self): + X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables + datelist = pd.date_range(dt.datetime.today().date(), periods=self.number_of_samples, freq="H").tolist() + return xr.DataArray(X1, dims=['datetime', 'window', 'variables'], coords={"datetime": datelist, + "window": range(14), + "variables": range(5)}) + + def get_Y(self): + Y1 = np.round(0.5 * np.random.randn(self.number_of_samples, 5, 1), 1) # samples, window, variables + datelist = pd.date_range(dt.datetime.today().date(), periods=self.number_of_samples, freq="H").tolist() + return xr.DataArray(Y1, dims=['datetime', 'window', 'variables'], coords={"datetime": datelist, + "window": range(5), + "variables": range(1)}) + + def __str__(self): + return self.name + + +class DataPreparation: + + def __init__(self, id_class, interpolate_dim: str, store_path, neighbors=None, min_length=0, + extreme_values: num_or_list = None,extremes_on_right_tail_only: bool = False,): + self.id_class = id_class + self.neighbors = to_list(neighbors) if neighbors is not None else [] + self.interpolate_dim = interpolate_dim + self.min_length = min_length + self._X = None + self._Y = None + self._X_extreme = None + self._Y_extreme = None + self._save_file = os.path.join(store_path, f"data_preparation_{str(self.id_class)}.pickle") + self._collection = [] + self._create_collection() + self.harmonise_X() + self.multiply_extremes(extreme_values, extremes_on_right_tail_only, dim=self.interpolate_dim) + self._store(fresh_store=True) + + def _reset_data(self): + self._X, self._Y, self._X_extreme, self._Y_extreme = None, None, None, None + + def _cleanup(self): + directory = os.path.dirname(self._save_file) + if os.path.exists(directory) is False: + os.makedirs(directory) + if os.path.exists(self._save_file): + shutil.rmtree(self._save_file, ignore_errors=True) + + def _store(self, fresh_store=False): + self._cleanup() if fresh_store is True else None + data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme} + with open(self._save_file, "wb") as f: + pickle.dump(data, f) + logging.debug(f"save pickle data to {self._save_file}") + self._reset_data() + + def _load(self): + try: + with open(self._save_file, "rb") as f: + data = pickle.load(f) + logging.debug(f"load pickle data from {self._save_file}") + self._X, self._Y = data["X"], data["Y"] + self._X_extreme, self._Y_extreme = data["X_extreme"], data["Y_extreme"] + except FileNotFoundError: + pass + + def get_data(self, upsampling=False, as_numpy=True): + self._load() + X = self.get_X(upsampling, as_numpy) + Y = self.get_Y(upsampling, as_numpy) + self._reset_data() + return X, Y + + def _create_collection(self): + for data_class in [self.id_class] + self.neighbors: + self._collection.append(data_class) + + def __repr__(self): + return ";".join(list(map(lambda x: str(x), self._collection))) + + def get_X_original(self): + X = [] + for data in self._collection: + X.append(data.get_X()) + return X + + def get_Y_original(self): + Y = self._collection[0].get_Y() + return Y + + @staticmethod + def _to_numpy(d): + return list(map(lambda x: np.copy(x), d)) + + def get_X(self, upsamling=False, as_numpy=True): + no_data = (self._X is None) + self._load() if no_data is True else None + X = self._X if upsamling is False else self._X_extreme + self._reset_data() if no_data is True else None + return self._to_numpy(X) if as_numpy is True else X + + def get_Y(self, upsamling=False, as_numpy=True): + no_data = (self._Y is None) + self._load() if no_data is True else None + Y = self._Y if upsamling is False else self._Y_extreme + self._reset_data() if no_data is True else None + return self._to_numpy([Y]) if as_numpy is True else Y + + def harmonise_X(self): + X_original, Y_original = self.get_X_original(), self.get_Y_original() + dim = self.interpolate_dim + intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original)) + if len(intersect) < max(self.min_length, 1): + X, Y = None, None + else: + X = list(map(lambda x: x.sel({dim: intersect}), X_original)) + Y = Y_original.sel({dim: intersect}) + self._X, self._Y = X, Y + + def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False, + timedelta: Tuple[int, str] = (1, 'm'), dim="datetime"): + """ + Multiply extremes. + + This method extracts extreme values from self.labels which are defined in the argument extreme_values. One can + also decide only to extract extremes on the right tail of the distribution. When extreme_values is a list of + floats/ints all values larger (and smaller than negative extreme_values; extraction is performed in standardised + space) than are extracted iteratively. If for example extreme_values = [1.,2.] then a value of 1.5 would be + extracted once (for 0th entry in list), while a 2.5 would be extracted twice (once for each entry). Timedelta is + used to mark those extracted values by adding one min to each timestamp. As TOAR Data are hourly one can + identify those "artificial" data points later easily. Extreme inputs and labels are stored in + self.extremes_history and self.extreme_labels, respectively. + + :param extreme_values: user definition of extreme + :param extremes_on_right_tail_only: if False also multiply values which are smaller then -extreme_values, + if True only extract values larger than extreme_values + :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime + """ + # check if X or Y is None + if (self._X is None) or (self._Y is None): + logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes") + return + if extreme_values is None: + logging.debug(f"No extreme values given, skip multiply extremes") + self._X_extreme, self._Y_extreme = self._X, self._Y + return + + # check type if inputs + extreme_values = to_list(extreme_values) + for i in extreme_values: + if not isinstance(i, number.__args__): + raise TypeError(f"Elements of list extreme_values have to be {number.__args__}, but at least element " + f"{i} is type {type(i)}") + + for extr_val in sorted(extreme_values): + # check if some extreme values are already extracted + if (self._X_extreme is None) or (self._Y_extreme is None): + X = self._X + Y = self._Y + else: # one extr value iteration is done already: self.extremes_label is NOT None... + X = self._X_extreme + Y = self._Y_extreme + + # extract extremes based on occurrence in labels + other_dims = remove_items(list(Y.dims), dim) + if extremes_on_right_tail_only: + extreme_idx = (Y > extr_val).any(dim=other_dims) + else: + extreme_idx = xr.concat([(Y < -extr_val).any(dim=other_dims[0]), + (Y > extr_val).any(dim=other_dims[0])], + dim=other_dims[1]).any(dim=other_dims[1]) + + extremes_X = list(map(lambda x: x.sel(**{dim: extreme_idx}), X)) + self._add_timedelta(extremes_X, dim, timedelta) + # extremes_X = list(map(lambda x: x.coords[dim].values + np.timedelta64(*timedelta), extremes_X)) + + extremes_Y = Y.sel(**{dim: extreme_idx}) + extremes_Y.coords[dim].values += np.timedelta64(*timedelta) + + self._Y_extreme = xr.concat([Y, extremes_Y], dim=dim) + self._X_extreme = list(map(lambda x1, x2: xr.concat([x1, x2], dim=dim), X, extremes_X)) + + @staticmethod + def _add_timedelta(data, dim, timedelta): + for d in data: + d.coords[dim].values += np.timedelta64(*timedelta) + + +def run_data_prep(): + + data = DummyDataSingleStation("main_class") + data.get_X() + data.get_Y() + + path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata") + data_prep = DataPreparation(DummyDataSingleStation("main_class"), "datetime", path, + neighbors=[DummyDataSingleStation("neighbor1"), DummyDataSingleStation("neighbor2")], + extreme_values=[1., 1.2]) + data_prep.get_data(upsampling=False) + + +def create_data_prep(): + + path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata") + station_type = None + network = 'UBA' + sampling = 'daily' + target_dim = 'variables' + target_var = 'o3' + interpolate_dim = 'datetime' + window_history_size = 7 + window_lead_time = 3 + central_station = StationPrep(path, "DEBW011", {'o3': 'dma8eu', 'temp': 'maximum'}, {},station_type, network, sampling, target_dim, + target_var, interpolate_dim, window_history_size, window_lead_time) + neighbor1 = StationPrep(path, "DEBW013", {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'}, {},station_type, network, sampling, target_dim, + target_var, interpolate_dim, window_history_size, window_lead_time) + neighbor2 = StationPrep(path, "DEBW034", {'o3': 'dma8eu', 'temp': 'maximum'}, {}, station_type, network, sampling, target_dim, + target_var, interpolate_dim, window_history_size, window_lead_time) + + data_prep = [] + data_prep.append(DataPreparation(central_station, interpolate_dim, path, neighbors=[neighbor1, neighbor2])) + data_prep.append(DataPreparation(neighbor1, interpolate_dim, path, neighbors=[central_station, neighbor2])) + data_prep.append(DataPreparation(neighbor2, interpolate_dim, path, neighbors=[neighbor1, central_station])) + return data_prep + +if __name__ == "__main__": + from src.data_handling.data_preparation import StationPrep + from src.data_handling.iterator import KerasIterator, DataCollection + data_prep = create_data_prep() + data_collection = DataCollection(data_prep) + for data in data_collection: + print(data) + path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testdata", "keras") + keras_it = KerasIterator(data_collection, 100, path) + keras_it[2] + diff --git a/src/data_handling/data_preparation.py b/src/data_handling/data_preparation.py index f2c27c6c4c5dda27236c5c5b3bf8e59e776862a1..d5933f193018efb1529db2c026981e8c4d7936d2 100644 --- a/src/data_handling/data_preparation.py +++ b/src/data_handling/data_preparation.py @@ -7,11 +7,13 @@ import datetime as dt import logging import os from functools import reduce -from typing import Union, List, Iterable, Tuple +from typing import Union, List, Iterable, Tuple, Dict +from src.helpers.join import EmptyQueryResult import numpy as np import pandas as pd import xarray as xr +import dask.array as da from src.configuration import check_path_and_create from src import helpers @@ -25,6 +27,546 @@ num_or_list = Union[number, List[number]] data_or_none = Union[xr.DataArray, None] +class AbstractStationPrep(): + def __init__(self): #, path, station, statistics_per_var, transformation, **kwargs): + pass + # passed parameters + # self.path = os.path.abspath(path) + # self.station = helpers.to_list(station) + # self.statistics_per_var = statistics_per_var + # # self.target_dim = 'variable' + # self.transformation = self.setup_transformation(transformation) + # self.kwargs = kwargs + # + # # internal + # self.data = None + # self.meta = None + # self.variables = kwargs.get('variables', list(statistics_per_var.keys())) + # self.history = None + # self.label = None + # self.observation = None + + + def get_X(self): + raise NotImplementedError + + def get_Y(self): + raise NotImplementedError + + # def load_data(self): + # try: + # self.read_data_from_disk() + # except FileNotFoundError: + # self.download_data() + # self.load_data() + # + # def read_data_from_disk(self): + # raise NotImplementedError + # + # def download_data(self): + # raise NotImplementedError + +class StationPrep(AbstractStationPrep): + + def __init__(self, path, station, statistics_per_var, transformation, station_type, network, sampling, target_dim, target_var, + interpolate_dim, window_history_size, window_lead_time, **kwargs): + super().__init__() # path, station, statistics_per_var, transformation, **kwargs) + self.station_type = station_type + self.network = network + self.sampling = sampling + self.target_dim = target_dim + self.target_var = target_var + self.interpolate_dim = interpolate_dim + self.window_history_size = window_history_size + self.window_lead_time = window_lead_time + + self.path = os.path.abspath(path) + self.station = helpers.to_list(station) + self.statistics_per_var = statistics_per_var + # self.target_dim = 'variable' + self.transformation = self.setup_transformation(transformation) + self.kwargs = kwargs + + # internal + self.data = None + self.meta = None + self.variables = kwargs.get('variables', list(statistics_per_var.keys())) + self.history = None + self.label = None + self.observation = None + + def __str__(self): + return self.station[0] + + def load_data(self): + try: + self.read_data_from_disk() + except FileNotFoundError: + self.download_data() + self.load_data() + self.make_samples() + + def __repr__(self): + return f"StationPrep(path='{self.path}', station={self.station}, statistics_per_var={self.statistics_per_var}, " \ + f"transformation={self.transformation}, station_type='{self.station_type}', network='{self.network}', " \ + f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \ + f"interpolate_dim='{self.interpolate_dim}', window_history_size={self.window_history_size}, " \ + f"window_lead_time={self.window_lead_time}, **{self.kwargs})" + + def get_transposed_history(self) -> xr.DataArray: + """Return history. + + :return: history with dimensions datetime, window, Stations, variables. + """ + return self.history.transpose("datetime", "window", "Stations", "variables").copy() + + def get_transposed_label(self) -> xr.DataArray: + """Return label. + + :return: label with dimensions datetime*, window*, Stations, variables. + """ + return self.label.squeeze("Stations").transpose("datetime", "window").copy() + + def get_X(self): + return self.get_transposed_history() + + def get_Y(self): + return self.get_transposed_label() + + def make_samples(self): + self.load_data() + self.make_history_window(self.target_dim, self.window_history_size, self.interpolate_dim) + self.make_labels(self.target_dim, self.target_var, self.interpolate_dim, self.window_lead_time) + self.make_observation(self.target_dim, self.target_var, self.interpolate_dim) + self.remove_nan(self.interpolate_dim) + + def read_data_from_disk(self, source_name=""): + """ + Load data and meta data either from local disk (preferred) or download new data by using a custom download method. + + Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both + cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not + set, it is assumed, that data should be saved locally. + """ + source_name = source_name if len(source_name) == 0 else f" from {source_name}" + check_path_and_create(self.path) + file_name = self._set_file_name() + meta_file = self._set_meta_file_name() + if self.kwargs.get('overwrite_local_data', False): + logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}") + if os.path.exists(file_name): + os.remove(file_name) + if os.path.exists(meta_file): + os.remove(meta_file) + data, self.meta = self.download_data(file_name, meta_file) + logging.debug(f"loaded new data{source_name}") + else: + try: + logging.debug(f"try to load local data from: {file_name}") + data = xr.open_dataarray(file_name) + self.meta = pd.read_csv(meta_file, index_col=0) + self.check_station_meta() + logging.debug("loading finished") + except FileNotFoundError as e: + logging.debug(e) + logging.debug(f"load new data{source_name}") + data, self.meta = self.download_data(file_name, meta_file) + logging.debug("loading finished") + # create slices and check for negative concentration. + data = self._slice_prep(data) + self.data = self.check_for_negative_concentrations(data) + + def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]: + """ + Download data from TOAR database using the JOIN interface. + + Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally + stored locally using given names for file and meta file. + + :param file_name: name of file to save data to (containing full path) + :param meta_file: name of the meta data file (also containing full path) + + :return: downloaded data and its meta data + """ + df_all = {} + df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var, + station_type=self.station_type, network_name=self.network, sampling=self.sampling) + df_all[self.station[0]] = df + # convert df_all to xarray + xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} + xarr = xr.Dataset(xarr).to_array(dim='Stations') + if self.kwargs.get('store_data_locally', True): + # save locally as nc/csv file + xarr.to_netcdf(path=file_name) + meta.to_csv(meta_file) + return xarr, meta + + def download_data(self, file_name, meta_file): + data, meta = self.download_data_from_join(file_name, meta_file) + return data, meta + + def check_station_meta(self): + """ + Search for the entries in meta data and compare the value with the requested values. + + Will raise a FileNotFoundError if the values mismatch. + """ + if self.station_type is not None: + check_dict = {"station_type": self.station_type, "network_name": self.network} + for (k, v) in check_dict.items(): + if v is None: + continue + if self.meta.at[k, self.station[0]] != v: + logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " + f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new " + f"grapping from web.") + raise FileNotFoundError + + def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: + """ + Set all negative concentrations to zero. + + Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/ + #2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox", + "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene". + + :param data: data array containing variables to check + :param minimum: minimum value, by default this should be 0 + + :return: corrected data + """ + chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", + "propane", "so2", "toluene"] + used_chem_vars = list(set(chem_vars) & set(self.variables)) + data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) + return data + + def shift(self, dim: str, window: int) -> xr.DataArray: + """ + Shift data multiple times to represent history (if window <= 0) or lead time (if window > 0). + + :param dim: dimension along shift is applied + :param window: number of steps to shift (corresponds to the window length) + + :return: shifted data + """ + start = 1 + end = 1 + if window <= 0: + start = window + else: + end = window + 1 + res = [] + for w in range(start, end): + res.append(self.data.shift({dim: -w})) + window_array = self.create_index_array('window', range(start, end), squeeze_dim=self.target_dim) + res = xr.concat(res, dim=window_array) + return res + + @staticmethod + def create_index_array(index_name: str, index_value: Iterable[int], squeeze_dim: str) -> xr.DataArray: + """ + Create an 1D xr.DataArray with given index name and value. + + :param index_name: name of dimension + :param index_value: values of this dimension + + :return: this array + """ + ind = pd.DataFrame({'val': index_value}, index=index_value) + # res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim=squeez/e_dim, drop=True) + res = xr.Dataset.from_dataframe(ind).to_array(squeeze_dim).rename({'index': index_name}).squeeze( + dim=squeeze_dim, + drop=True + ) + res.name = index_name + return res + + def _set_file_name(self): + all_vars = sorted(self.statistics_per_var.keys()) + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc") + + def _set_meta_file_name(self): + all_vars = sorted(self.statistics_per_var.keys()) + return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv") + + def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True, + **kwargs): + """ + Interpolate values according to different methods. + + (Copy paste from dataarray.interpolate_na) + + :param dim: + Specifies the dimension along which to interpolate. + :param method: + {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial', 'barycentric', 'krog', 'pchip', + 'spline', 'akima'}, optional + String indicating which method to use for interpolation: + + - 'linear': linear interpolation (Default). Additional keyword + arguments are passed to ``numpy.interp`` + - 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', + 'polynomial': are passed to ``scipy.interpolate.interp1d``. If + method=='polynomial', the ``order`` keyword argument must also be + provided. + - 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their + respective``scipy.interpolate`` classes. + :param limit: + default None + Maximum number of consecutive NaNs to fill. Must be greater than 0 + or None for no limit. + :param use_coordinate: + default True + Specifies which index to use as the x values in the interpolation + formulated as `y = f(x)`. If False, values are treated as if + eqaully-spaced along `dim`. If True, the IndexVariable `dim` is + used. If use_coordinate is a string, it specifies the name of a + coordinate variariable to use as the index. + :param kwargs: + + :return: xarray.DataArray + """ + self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate, + **kwargs) + + def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None: + """ + Create a xr.DataArray containing history data. + + Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted + data. This is used to represent history in the data. Results are stored in history attribute. + + :param dim_name_of_inputs: Name of dimension which contains the input variables + :param window: number of time steps to look back in history + Note: window will be treated as negative value. This should be in agreement with looking back on + a time line. Nonetheless positive values are allowed but they are converted to its negative + expression + :param dim_name_of_shift: Dimension along shift will be applied + """ + window = -abs(window) + self.history = self.shift(dim_name_of_shift, window).sel({dim_name_of_inputs: self.variables}) + + def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str, + window: int) -> None: + """ + Create a xr.DataArray containing labels. + + Labels are defined as the consecutive target values (t+1, ...t+n) following the current time step t. Set label + attribute. + + :param dim_name_of_target: Name of dimension which contains the target variable + :param target_var: Name of target variable in 'dimension' + :param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied + :param window: lead time of label + """ + window = abs(window) + self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var}) + + def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None: + """ + Create a xr.DataArray containing observations. + + Observations are defined as value of the current time step t. Set observation attribute. + + :param dim_name_of_target: Name of dimension which contains the observation variable + :param target_var: Name of observation variable(s) in 'dimension' + :param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied + """ + self.observation = self.shift(dim_name_of_shift, 0).sel({dim_name_of_target: target_var}) + + def remove_nan(self, dim: str) -> None: + """ + Remove all NAs slices along dim which contain nans in history, label and observation. + + This is done to present only a full matrix to keras.fit. Update history, label, and observation attribute. + + :param dim: dimension along the remove is performed. + """ + intersect = [] + if (self.history is not None) and (self.label is not None): + non_nan_history = self.history.dropna(dim=dim) + non_nan_label = self.label.dropna(dim=dim) + non_nan_observation = self.observation.dropna(dim=dim) + intersect = reduce(np.intersect1d, (non_nan_history.coords[dim].values, non_nan_label.coords[dim].values, + non_nan_observation.coords[dim].values)) + + min_length = self.kwargs.get("min_length", 0) + if len(intersect) < max(min_length, 1): + self.history = None + self.label = None + self.observation = None + else: + self.history = self.history.sel({dim: intersect}) + self.label = self.label.sel({dim: intersect}) + self.observation = self.observation.sel({dim: intersect}) + + def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray: + """ + Set start and end date for slicing and execute self._slice(). + + :param data: data to slice + :param coord: name of axis to slice + + :return: sliced data + """ + start = self.kwargs.get('start', data.coords[coord][0].values) + end = self.kwargs.get('end', data.coords[coord][-1].values) + return self._slice(data, start, end, coord) + + @staticmethod + def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray: + """ + Slice through a given data_item (for example select only values of 2011). + + :param data: data to slice + :param start: start date of slice + :param end: end date of slice + :param coord: name of axis to slice + + :return: sliced data + """ + return data.loc[{coord: slice(str(start), str(end))}] + + def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: + """ + Set all negative concentrations to zero. + + Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/ + #2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox", + "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene". + + :param data: data array containing variables to check + :param minimum: minimum value, by default this should be 0 + + :return: corrected data + """ + chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", + "propane", "so2", "toluene"] + used_chem_vars = list(set(chem_vars) & set(self.variables)) + data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) + return data + + def setup_transformation(self, transformation: Dict): + """ + Set up transformation by extracting all relevant information. + + Extract all information from transformation dictionary. Possible keys are scope. method, mean, and std. Scope + can either be station or data. Station scope means, that data transformation is performed for each station + independently (somehow like batch normalisation), whereas data scope means a transformation applied on the + entire data set. + + * If using data scope, mean and standard deviation (each only if required by transformation method) can either + be calculated accurate or as an estimate (faster implementation). This must be set in dictionary either + as "mean": "accurate" or "mean": "estimate". In both cases, the required statistics are calculated and saved. + After this calculations, the mean key is overwritten by the actual values to use. + * If using station scope, no additional information is required. + * If a transformation should be applied on base of existing values, these need to be provided in the respective + keys "mean" and "std" (again only if required for given method). + + :param transformation: the transformation dictionary as described above. + + :return: updated transformation dictionary + """ + if transformation is None: + return + transformation = transformation.copy() + scope = transformation.get("scope", "station") + method = transformation.get("method", "standardise") + mean = transformation.get("mean", None) + std = transformation.get("std", None) + if scope == "data": + if isinstance(mean, str): + if mean == "accurate": + mean, std = self.calculate_accurate_transformation(method) + elif mean == "estimate": + mean, std = self.calculate_estimated_transformation(method) + else: + raise ValueError(f"given mean attribute must either be equal to strings 'accurate' or 'estimate' or" + f"be an array with already calculated means. Given was: {mean}") + elif scope == "station": + mean, std = None, None + else: + raise ValueError(f"Scope argument can either be 'station' or 'data'. Given was: {scope}") + transformation["method"] = method + transformation["mean"] = mean + transformation["std"] = std + return transformation + + def calculate_accurate_transformation(self, method: str) -> Tuple[data_or_none, data_or_none]: + """ + Calculate accurate transformation statistics. + + Use all stations of this generator and calculate mean and standard deviation on entire data set using dask. + Because there can be much data, this can take a while. + + :param method: name of transformation method + + :return: accurate calculated mean and std (depending on transformation) + """ + tmp = [] + mean = None + std = None + for station in self.stations: + try: + data = self.DataPrep(self.data_path, station, self.variables, station_type=self.station_type, + **self.kwargs) + chunks = (1, 100, data.data.shape[2]) + tmp.append(da.from_array(data.data.data, chunks=chunks)) + except EmptyQueryResult: + continue + tmp = da.concatenate(tmp, axis=1) + if method in ["standardise", "centre"]: + mean = da.nanmean(tmp, axis=1).compute() + mean = xr.DataArray(mean.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"]) + if method == "standardise": + std = da.nanstd(tmp, axis=1).compute() + std = xr.DataArray(std.flatten(), coords={"variables": sorted(self.variables)}, dims=["variables"]) + else: + raise NotImplementedError + return mean, std + + def calculate_estimated_transformation(self, method): + """ + Calculate estimated transformation statistics. + + Use all stations of this generator and calculate mean and standard deviation first for each station separately. + Afterwards, calculate the average mean and standard devation as estimated statistics. Because this method does + not consider the length of each data set, the estimated mean distinguishes from the real data mean. Furthermore, + the estimated standard deviation is assumed to be the mean (also not weighted) of all deviations. But this is + mathematically not true, but still a rough and faster estimation of the true standard deviation. Do not use this + method for further statistical calculation. However, in the scope of data preparation for machine learning, this + approach is decent ("it is just scaling"). + + :param method: name of transformation method + + :return: accurate calculated mean and std (depending on transformation) + """ + data = [[]] * len(self.variables) + coords = {"variables": self.variables, "Stations": range(0)} + mean = xr.DataArray(data, coords=coords, dims=["variables", "Stations"]) + std = xr.DataArray(data, coords=coords, dims=["variables", "Stations"]) + for station in self.stations: + try: + data = self.DataPrep(self.data_path, station, self.variables, station_type=self.station_type, + **self.kwargs) + data.transform("datetime", method=method) + mean = mean.combine_first(data.mean) + std = std.combine_first(data.std) + data.transform("datetime", method=method, inverse=True) + except EmptyQueryResult: + continue + return mean.mean("Stations") if mean.shape[1] > 0 else None, std.mean("Stations") if std.shape[1] > 0 else None + + def load_data(self): + try: + self.read_data_from_disk() + except FileNotFoundError: + self.download_data() + self.load_data() + + class AbstractDataPrep(object): """ This class prepares data to be used in neural networks. @@ -554,5 +1096,13 @@ class AbstractDataPrep(object): if __name__ == "__main__": - dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) - print(dp) + # dp = AbstractDataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) + # print(dp) + statistics_per_var = {'o3': 'dma8eu', 'temp-rea-miub': 'maximum'} + sp = StationPrep(path='/home/felix/PycharmProjects/mlt_new/data/', station='DEBY122', + statistics_per_var=statistics_per_var, transformation={}, station_type='background', + network='UBA', sampling='daily', target_dim='variables', target_var='o3', + interpolate_dim='datetime', window_history_size=7, window_lead_time=3) + sp.get_X() + sp.get_Y() + print(sp) diff --git a/src/data_handling/iterator.py b/src/data_handling/iterator.py index 51073286810f4eb6ef02beb47727932008cb9b7c..14d71a9afc23d3a0d80bacf60bbaa928fb34407a 100644 --- a/src/data_handling/iterator.py +++ b/src/data_handling/iterator.py @@ -136,14 +136,14 @@ class DummyData: # pragma: no cover self.number_of_samples = number_of_samples def get_X(self): - X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables - X2 = np.random.randint(21, 30, size=(self.number_of_samples, 10, 2)) # samples, window, variables - X3 = np.random.randint(-5, 0, size=(self.number_of_samples, 1, 2)) # samples, window, variables + X1 = np.random.randint(0, 10, size=(self.number_of_samples, 14, 5)) # samples, window, variables + X2 = np.random.randint(21, 30, size=(self.number_of_samples, 10, 2)) # samples, window, variables + X3 = np.random.randint(-5, 0, size=(self.number_of_samples, 1, 2)) # samples, window, variables return [X1, X2, X3] def get_Y(self): - Y1 = np.random.randint(0, 10, size=(self.number_of_samples, 5, 1)) # samples, window, variables - Y2 = np.random.randint(21, 30, size=(self.number_of_samples, 5, 1)) # samples, window, variables + Y1 = np.random.randint(0, 10, size=(self.number_of_samples, 5, 1)) # samples, window, variables + Y2 = np.random.randint(21, 30, size=(self.number_of_samples, 5, 1)) # samples, window, variables return [Y1, Y2]