__author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-10-16' import xarray as xr import pandas as pd import numpy as np import logging import os from src import join, helpers from src import statistics from typing import Union, List, Iterable import datetime as dt # define more general date type for type hinting date = Union[dt.date, dt.datetime] class DataPrep(object): """ This class prepares data to be used in neural networks. Especially the following steps can be performed - interpolate: interpolate between data points by using xarray's interpolation method - standardise: standardise data to mean=1 and std=1, or just centralise to mean=0 - make window history: to present the history (time steps before) for training/ testing; X - make labels: create target vector for training/ testing; y - remove Nans jointly from desired input and output, only keeps time steps where no NaNs are present in X AND y - some other methods to ensure that the functions above are working properly """ def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], **kwargs): self.path = os.path.abspath(path) self.network = network self.station = helpers.to_list(station) self.variables = variables self.mean = None self.std = None self.history = None self.label = None self.kwargs = kwargs self.data = None self.meta = None self._transform_method = None self.statistics_per_var = kwargs.get("statistics_per_var", None) if self.statistics_per_var is not None: self.load_data() else: raise NotImplementedError # self.data, self.meta = Fkf.read_hourly_data_from_csv_to_xarray(self.path, self.network, self.station, # self.variables, **kwargs) def load_data(self): self.check_path_and_create() file_name = self._set_file_name() meta_file = self._set_meta_file_name() try: data = self._slice_prep(xr.open_dataarray(file_name)) self.data = self.check_for_negative_concentrations(data) self.meta = pd.read_csv(meta_file, index_col=0) except FileNotFoundError as e: logging.warning(e) df_all = {} df, self.meta = join.download_join(station_name=self.station, statvar=self.statistics_per_var) df_all[self.station[0]] = df # convert df_all to xarray xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} xarr = xr.Dataset(xarr).to_array(dim='Stations') data = self._slice_prep(xarr) self.data = self.check_for_negative_concentrations(data) # save locally as nc/csv file xarr.to_netcdf(path=file_name) self.meta.to_csv(meta_file) def _set_file_name(self): return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}.nc") def _set_meta_file_name(self): return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(sorted(self.variables))}_meta.csv") def __repr__(self): return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \ f"variables={self.variables}, **{self.kwargs})" def check_path_and_create(self): try: os.makedirs(self.path) logging.info(f"Created path: {self.path}") except FileExistsError: logging.info(f"Path already exists: {self.path}") pass def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True, **kwargs): """ (Copy paste from dataarray.interpolate_na) Interpolate values according to different methods. :param dim: Specifies the dimension along which to interpolate. :param method: {'linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'polynomial', 'barycentric', 'krog', 'pchip', 'spline', 'akima'}, optional String indicating which method to use for interpolation: - 'linear': linear interpolation (Default). Additional keyword arguments are passed to ``numpy.interp`` - 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'polynomial': are passed to ``scipy.interpolate.interp1d``. If method=='polynomial', the ``order`` keyword argument must also be provided. - 'barycentric', 'krog', 'pchip', 'spline', and `akima`: use their respective``scipy.interpolate`` classes. :param limit: default None Maximum number of consecutive NaNs to fill. Must be greater than 0 or None for no limit. :param use_coordinate: default True Specifies which index to use as the x values in the interpolation formulated as `y = f(x)`. If False, values are treated as if eqaully-spaced along `dim`. If True, the IndexVariable `dim` is used. If use_coordinate is a string, it specifies the name of a coordinate variariable to use as the index. :param kwargs: :return: xarray.DataArray """ self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate, **kwargs) @staticmethod def check_inverse_transform_params(mean, std, method) -> None: msg = "" if method in ['standardise', 'centre'] and mean is None: msg += "mean, " if method == 'standardise' and std is None: msg += "std, " if len(msg) > 0: raise AttributeError(f"Inverse transform {method} can not be executed because following is None: {msg}") def inverse_transform(self) -> None: """ Perform inverse transformation :return: """ def f_inverse(data, mean, std, method_inverse): if method_inverse == 'standardise': return statistics.standardise_inverse(data, mean, std), None, None elif method_inverse == 'centre': return statistics.centre_inverse(data, mean), None, None elif method_inverse == 'normalise': raise NotImplementedError else: raise NotImplementedError if self._transform_method is None: raise AssertionError("Inverse transformation method is not set. Data cannot be inverse transformed.") self.check_inverse_transform_params(self.mean, self.std, self._transform_method) self.data, self.mean, self.std = f_inverse(self.data, self.mean, self.std, self._transform_method) self._transform_method = None def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False) -> None: """ This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This method will raise an AssertionError if an internal transform method was already set ('inverse=False') or if the internal transform method, internal mean and internal standard deviation weren't set ('inverse=True'). :param string/int dim: This param is not used for inverse transformation. | for xarray.DataArray as string: name of dimension which should be standardised | for pandas.DataFrame as int: axis of dimension which should be standardised :param method: Choose the transformation method from 'standardise' and 'centre'. 'normalise' is not implemented yet. This param is not used for inverse transformation. :param inverse: Switch between transformation and inverse transformation. :return: xarray.DataArrays or pandas.DataFrames: #. mean: Mean of data #. std: Standard deviation of data #. data: Standardised data """ def f(data): if method == 'standardise': return statistics.standardise(data, dim) elif method == 'centre': return statistics.centre(data, dim) elif method == 'normalise': # use min/max of data or given min/max raise NotImplementedError else: raise NotImplementedError if not inverse: if self._transform_method is not None: raise AssertionError(f"Transform method is already set. Therefore, data was already transformed with " f"{self._transform_method}. Please perform inverse transformation of data first.") self.mean, self.std, self.data = f(self.data) self._transform_method = method else: self.inverse_transform() def make_history_window(self, dim: str, window: int) -> None: """ This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window' containing the shifted data. This is used to represent history in the data. Results are stored in self.history . :param dim: Dimension along shift will be applied :param window: number of time steps to look back in history Note: window will be treated as negative value. This should be in agreement with looking back on a time line. Nonetheless positive values are allowed but they are converted to its negative expression """ window = -abs(window) self.history = self.shift(dim, window) def shift(self, dim: str, window: int) -> xr.DataArray: """ This function uses xarray's shift function multiple times to represent history (if window <= 0) or lead time (if window > 0) :param dim: dimension along shift is applied :param window: number of steps to shift (corresponds to the window length) :return: """ start = 1 end = 1 if window <= 0: start = window else: end = window + 1 res = [] for w in range(start, end): res.append(self.data.shift({dim: -w})) window_array = self.create_index_array('window', range(start, end)) res = xr.concat(res, dim=window_array) return res def make_labels(self, dim_name_of_target: str, target_var: str, dim_name_of_shift: str, window: int) -> None: """ This function creates a xarray.DataArray containing labels :param dim_name_of_target: Name of dimension which contains the target variable :param target_var: Name of target variable in 'dimension' :param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied :param window: lead time of label """ window = abs(window) self.label = self.shift(dim_name_of_shift, window).sel({dim_name_of_target: target_var}) def history_label_nan_remove(self, dim: str) -> None: """ All NAs slices in dim which contain nans in self.history or self.label are removed in both data sets. This is done to present only a full matrix to keras.fit. :param dim: :return: """ intersect = [] if (self.history is not None) and (self.label is not None): non_nan_history = self.history.dropna(dim=dim) non_nan_label = self.label.dropna(dim=dim) intersect = np.intersect1d(non_nan_history.coords[dim].values, non_nan_label.coords[dim].values) if len(intersect) == 0: self.history = None self.label = None else: self.history = self.history.sel({dim: intersect}) self.label = self.label.sel({dim: intersect}) @staticmethod def create_index_array(index_name: str, index_value: Iterable[int]) -> xr.DataArray: """ This Function crates a 1D xarray.DataArray with given index name and value :param index_name: :param index_value: :return: """ ind = pd.DataFrame({'val': index_value}, index=index_value) res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim='variable', drop=True) res.name = index_name return res def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray: """ This function prepares all settings for slicing and executes _slice :param data: :param coord: name of axis to slice :return: """ start = self.kwargs.get('start', data.coords[coord][0].values) end = self.kwargs.get('end', data.coords[coord][-1].values) return self._slice(data, start, end, coord) @staticmethod def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray: """ This function slices through a given data_item (for example select only values of 2011) :param data: :param start: :param end: :param coord: name of axis to slice :return: """ return data.loc[{coord: slice(start, end)}] def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: """ This function sets all negative concentrations to zero. Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/ #2.1 Parameters :param data: :param minimum: :return: """ chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] used_chem_vars = list(set(chem_vars) & set(self.variables)) data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) return data if __name__ == "__main__": dp = DataPrep('data/', 'dummy', 'DEBW107', ['o3', 'temp'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}) print(dp)