From 2fadc0ffd7131b040e6eba2c2da7960db83a572b Mon Sep 17 00:00:00 2001 From: lukas leufen <l.leufen@fz-juelich.de> Date: Mon, 20 Apr 2020 11:59:48 +0200 Subject: [PATCH] updated all doc strings in data_handling/ --- src/data_handling/__init__.py | 8 +- src/data_handling/data_generator.py | 50 ++++++- src/data_handling/data_preparation.py | 195 ++++++++++++++++++-------- 3 files changed, 191 insertions(+), 62 deletions(-) diff --git a/src/data_handling/__init__.py b/src/data_handling/__init__.py index e03e44b2..a689cbe3 100644 --- a/src/data_handling/__init__.py +++ b/src/data_handling/__init__.py @@ -1,9 +1,9 @@ -__author__ = 'Lukas Leufen, Felix Kleinert' -__date__ = '2020-04-17' - """ Data Handling. -The module data_handling contains all methods and classes that are somehow related to data preprocessing, +The module data_handling contains all methods and classes that are somehow related to data preprocessing, postprocessing, loading, and distribution for training. """ + +__author__ = 'Lukas Leufen, Felix Kleinert' +__date__ = '2020-04-17' diff --git a/src/data_handling/data_generator.py b/src/data_handling/data_generator.py index d45713c9..b2c2549b 100644 --- a/src/data_handling/data_generator.py +++ b/src/data_handling/data_generator.py @@ -18,6 +18,7 @@ from src.join import EmptyQueryResult number = Union[float, int] num_or_list = Union[number, List[number]] +data_or_none = Union[xr.DataArray, None] class DataGenerator(keras.utils.Sequence): @@ -147,7 +148,27 @@ class DataGenerator(keras.utils.Sequence): data = self.get_data_generator(key=item) return data.get_transposed_history(), data.get_transposed_label() - def setup_transformation(self, transformation): + def setup_transformation(self, transformation: Dict): + """ + Set up transformation by extracting all relevant information. + + Extract all information from transformation dictionary. Possible keys are scope. method, mean, and std. Scope + can either be station or data. Station scope means, that data transformation is performed for each station + independently (somehow like batch normalisation), whereas data scope means a transformation applied on the + entire data set. + + * If using data scope, mean and standard deviation (each only if required by transformation method) can either + be calculated accurate or as an estimate (faster implementation). This must be set in dictionary either + as "mean": "accurate" or "mean": "estimate". In both cases, the required statistics are calculated and saved. + After this calculations, the mean key is overwritten by the actual values to use. + * If using station scope, no additional information is required. + * If a transformation should be applied on base of existing values, these need to be provided in the respective + keys "mean" and "std" (again only if required for given method). + + :param transformation: the transformation dictionary as described above. + + :return: updated transformation dictionary + """ if transformation is None: return transformation = transformation.copy() @@ -173,7 +194,17 @@ class DataGenerator(keras.utils.Sequence): transformation["std"] = std return transformation - def calculate_accurate_transformation(self, method): + def calculate_accurate_transformation(self, method: str) -> Tuple[data_or_none, data_or_none]: + """ + Calculate accurate transformation statistics. + + Use all stations of this generator and calculate mean and standard deviation on entire data set using dask. + Because there can be much data, this can take a while. + + :param method: name of transformation method + + :return: accurate calculated mean and std (depending on transformation) + """ tmp = [] mean = None std = None @@ -197,6 +228,21 @@ class DataGenerator(keras.utils.Sequence): return mean, std def calculate_estimated_transformation(self, method): + """ + Calculate estimated transformation statistics. + + Use all stations of this generator and calculate mean and standard deviation first for each station separately. + Afterwards, calculate the average mean and standard devation as estimated statistics. Because this method does + not consider the length of each data set, the estimated mean distinguishes from the real data mean. Furthermore, + the estimated standard deviation is assumed to be the mean (also not weighted) of all deviations. But this is + mathematically not true, but still a rough and faster estimation of the true standard deviation. Do not use this + method for further statistical calculation. However, in the scope of data preparation for machine learning, this + approach is decent ("it is just scaling"). + + :param method: name of transformation method + + :return: accurate calculated mean and std (depending on transformation) + """ data = [[]] * len(self.variables) coords = {"variables": self.variables, "Stations": range(0)} mean = xr.DataArray(data, coords=coords, dims=["variables", "Stations"]) diff --git a/src/data_handling/data_preparation.py b/src/data_handling/data_preparation.py index 6c89c9c2..eccb66b3 100644 --- a/src/data_handling/data_preparation.py +++ b/src/data_handling/data_preparation.py @@ -1,3 +1,5 @@ +"""Data Preparation class to handle data processing for machine learning.""" + __author__ = 'Felix Kleinert, Lukas Leufen' __date__ = '2019-10-16' @@ -19,14 +21,16 @@ date = Union[dt.date, dt.datetime] str_or_list = Union[str, List[str]] number = Union[float, int] num_or_list = Union[number, List[number]] +data_or_none = Union[xr.DataArray, None] class DataPrep(object): """ - This class prepares data to be used in neural networks. The instance searches for local stored data, that meet the - given demands. If no local data is found, the DataPrep instance will load data from TOAR database and store this - data locally to use the next time. For the moment, there is only support for daily aggregated time series. The - aggregation can be set manually and differ for each variable. + This class prepares data to be used in neural networks. + + The instance searches for local stored data, that meet the given demands. If no local data is found, the DataPrep + instance will load data from TOAR database and store this data locally to use the next time. For the moment, there + is only support for daily aggregated time series. The aggregation can be set manually and differ for each variable. After data loading, different data pre-processing steps can be executed to prepare the data for further applications. Especially the following methods can be used for the pre-processing step: @@ -52,18 +56,19 @@ class DataPrep(object): def __init__(self, path: str, network: str, station: Union[str, List[str]], variables: List[str], station_type: str = None, **kwargs): + """Construct instance.""" self.path = os.path.abspath(path) self.network = network self.station = helpers.to_list(station) self.variables = variables self.station_type = station_type - self.mean = None - self.std = None - self.history = None - self.label = None - self.observation = None - self.extremes_history = None - self.extremes_label = None + self.mean: data_or_none = None + self.std: data_or_none = None + self.history: data_or_none = None + self.label: data_or_none = None + self.observation: data_or_none = None + self.extremes_history: data_or_none = None + self.extremes_label: data_or_none = None self.kwargs = kwargs self.data = None self.meta = None @@ -77,8 +82,11 @@ class DataPrep(object): def load_data(self): """ - Load data and meta data either from local disk (preferred) or download new data from TOAR database if no local - data is available. The latter case, store downloaded data locally if wished (default yes). + Load data and meta data either from local disk (preferred) or download new data from TOAR database. + + Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both + cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not + set, it is assumed, that data should be saved locally. """ helpers.check_path_and_create(self.path) file_name = self._set_file_name() @@ -106,14 +114,25 @@ class DataPrep(object): logging.debug("loaded new data from JOIN") def download_data(self, file_name, meta_file): + """ + Download data from join, create slices and check for negative concentration. + + Handle sequence of required operation on new data downloads. First, download data using class method + download_data_from_join. Second, slice data using _slice_prep and lastly check for negative concentrations in + data with check_for_negative_concentrations. Finally, data is stored in instance attribute data. + + :param file_name: name of file to save data to (containing full path) + :param meta_file: name of the meta data file (also containing full path) + """ data, self.meta = self.download_data_from_join(file_name, meta_file) data = self._slice_prep(data) self.data = self.check_for_negative_concentrations(data) def check_station_meta(self): """ - Search for the entries in meta data and compare the value with the requested values. Raise a FileNotFoundError - if the values mismatch. + Search for the entries in meta data and compare the value with the requested values. + + Will raise a FileNotFoundError if the values mismatch. """ check_dict = {"station_type": self.station_type, "network_name": self.network} for (k, v) in check_dict.items(): @@ -127,9 +146,13 @@ class DataPrep(object): """ Download data from TOAR database using the JOIN interface. - :param file_name: - :param meta_file: - :return: + Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally + stored locally using given names for file and meta file. + + :param file_name: name of file to save data to (containing full path) + :param meta_file: name of the meta data file (also containing full path) + + :return: downloaded data and its meta data """ df_all = {} df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var, @@ -153,15 +176,17 @@ class DataPrep(object): return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv") def __repr__(self): + """Represent class attributes.""" return f"Dataprep(path='{self.path}', network='{self.network}', station={self.station}, " \ f"variables={self.variables}, station_type={self.station_type}, **{self.kwargs})" def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True, **kwargs): """ - (Copy paste from dataarray.interpolate_na) Interpolate values according to different methods. + (Copy paste from dataarray.interpolate_na) + :param dim: Specifies the dimension along which to interpolate. :param method: @@ -190,14 +215,24 @@ class DataPrep(object): used. If use_coordinate is a string, it specifies the name of a coordinate variariable to use as the index. :param kwargs: + :return: xarray.DataArray """ - self.data = self.data.interpolate_na(dim=dim, method=method, limit=limit, use_coordinate=use_coordinate, **kwargs) @staticmethod - def check_inverse_transform_params(mean, std, method) -> None: + def check_inverse_transform_params(mean: data_or_none, std: data_or_none, method: str) -> None: + """ + Support inverse_transformation method. + + Validate if all required statistics are available for given method. E.g. centering requires mean only, whereas + normalisation requires mean and standard deviation. Will raise an AttributeError on missing requirements. + + :param mean: data with all mean values + :param std: data with all standard deviation values + :param method: name of transformation method + """ msg = "" if method in ['standardise', 'centre'] and mean is None: msg += "mean, " @@ -208,9 +243,12 @@ class DataPrep(object): def inverse_transform(self) -> None: """ - Perform inverse transformation + Perform inverse transformation. - :return: None + Will raise an AssertionError, if no transformation was performed before. Checks first, if all required + statistics are available for inverse transformation. Class attributes data, mean and std are overwritten by + new data afterwards. Thereby, mean, std, and the private transform method are set to None to indicate, that the + current data is not transformed. """ def f_inverse(data, mean, std, method_inverse): @@ -232,6 +270,8 @@ class DataPrep(object): def transform(self, dim: Union[str, int] = 0, method: str = 'standardise', inverse: bool = False, mean=None, std=None) -> None: """ + Transform data according to given transformation settings. + This function transforms a xarray.dataarray (along dim) or pandas.DataFrame (along axis) either with mean=0 and std=1 (`method=standardise`) or centers the data with mean=0 and no change in data scale (`method=centre`). Furthermore, this sets an internal instance attribute for later inverse transformation. This @@ -244,6 +284,7 @@ class DataPrep(object): :param method: Choose the transformation method from 'standardise' and 'centre'. 'normalise' is not implemented yet. This param is not used for inverse transformation. :param inverse: Switch between transformation and inverse transformation. + :return: xarray.DataArrays or pandas.DataFrames: #. mean: Mean of data #. std: Standard deviation of data @@ -278,7 +319,18 @@ class DataPrep(object): else: self.inverse_transform() - def get_transformation_information(self, variable): + def get_transformation_information(self, variable: str) -> Tuple[data_or_none, data_or_none, str]: + """ + Extract transformation statistics and method. + + Get mean and standard deviation for given variable and the transformation method if set. If a transformation + depends only on particular statistics (e.g. only mean is required for centering), the remaining statistics are + returned with None as fill value. + + :param variable: Variable for which the information on transformation is requested. + + :return: mean, standard deviation and transformation method + """ try: mean = self.mean.sel({'variables': variable}).values except AttributeError: @@ -291,8 +343,10 @@ class DataPrep(object): def make_history_window(self, dim_name_of_inputs: str, window: int, dim_name_of_shift: str) -> None: """ - This function uses shifts the data window+1 times and returns a xarray which has a new dimension 'window' - containing the shifted data. This is used to represent history in the data. Results are stored in self.history . + Create a xr.DataArray containing history data. + + Shift the data window+1 times and return a xarray which has a new dimension 'window' containing the shifted + data. This is used to represent history in the data. Results are stored in history attribute. :param dim_name_of_inputs: Name of dimension which contains the input variables :param window: number of time steps to look back in history @@ -306,12 +360,12 @@ class DataPrep(object): def shift(self, dim: str, window: int) -> xr.DataArray: """ - This function uses xarray's shift function multiple times to represent history (if window <= 0) - or lead time (if window > 0) + Shift data multiple times to represent history (if window <= 0) or lead time (if window > 0). :param dim: dimension along shift is applied :param window: number of steps to shift (corresponds to the window length) - :return: + + :return: shifted data """ start = 1 end = 1 @@ -329,7 +383,10 @@ class DataPrep(object): def make_labels(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str, window: int) -> None: """ - This function creates a xarray.DataArray containing labels + Create a xr.DataArray containing labels. + + Labels are defined as the consecutive target values (t+1, ...t+n) following the current time step t. Set label + attribute. :param dim_name_of_target: Name of dimension which contains the target variable :param target_var: Name of target variable in 'dimension' @@ -341,21 +398,23 @@ class DataPrep(object): def make_observation(self, dim_name_of_target: str, target_var: str_or_list, dim_name_of_shift: str) -> None: """ - This function creates a xarray.DataArray containing labels + Create a xr.DataArray containing observations. - :param dim_name_of_target: Name of dimension which contains the target variable - :param target_var: Name of target variable(s) in 'dimension' + Observations are defined as value of the current time step t. Set observation attribute. + + :param dim_name_of_target: Name of dimension which contains the observation variable + :param target_var: Name of observation variable(s) in 'dimension' :param dim_name_of_shift: Name of dimension on which xarray.DataArray.shift will be applied """ self.observation = self.shift(dim_name_of_shift, 0).sel({dim_name_of_target: target_var}) def remove_nan(self, dim: str) -> None: """ - All NAs slices in dim which contain nans in self.history or self.label are removed in both data sets. - This is done to present only a full matrix to keras.fit. + Remove all NAs slices along dim which contain nans in history, label and observation. - :param dim: - :return: + This is done to present only a full matrix to keras.fit. Update history, label, and observation attribute. + + :param dim: dimension along the remove is performed. """ intersect = [] if (self.history is not None) and (self.label is not None): @@ -378,11 +437,12 @@ class DataPrep(object): @staticmethod def create_index_array(index_name: str, index_value: Iterable[int]) -> xr.DataArray: """ - This Function crates a 1D xarray.DataArray with given index name and value + Create an 1D xr.DataArray with given index name and value. - :param index_name: - :param index_value: - :return: + :param index_name: name of dimension + :param index_value: values of this dimension + + :return: this array """ ind = pd.DataFrame({'val': index_value}, index=index_value) res = xr.Dataset.from_dataframe(ind).to_array().rename({'index': index_name}).squeeze(dim='variable', drop=True) @@ -391,11 +451,12 @@ class DataPrep(object): def _slice_prep(self, data: xr.DataArray, coord: str = 'datetime') -> xr.DataArray: """ - This function prepares all settings for slicing and executes _slice + Set start and end date for slicing and execute self._slice(). - :param data: + :param data: data to slice :param coord: name of axis to slice - :return: + + :return: sliced data """ start = self.kwargs.get('start', data.coords[coord][0].values) end = self.kwargs.get('end', data.coords[coord][-1].values) @@ -404,24 +465,29 @@ class DataPrep(object): @staticmethod def _slice(data: xr.DataArray, start: Union[date, str], end: Union[date, str], coord: str) -> xr.DataArray: """ - This function slices through a given data_item (for example select only values of 2011) + Slice through a given data_item (for example select only values of 2011). - :param data: - :param start: - :param end: + :param data: data to slice + :param start: start date of slice + :param end: end date of slice :param coord: name of axis to slice - :return: + + :return: sliced data """ return data.loc[{coord: slice(str(start), str(end))}] def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: """ - This function sets all negative concentrations to zero. Names of all concentrations are extracted from - https://join.fz-juelich.de/services/rest/surfacedata/ \#2.1 Parameters + Set all negative concentrations to zero. - :param data: - :param minimum: - :return: + Names of all concentrations are extracted from https://join.fz-juelich.de/services/rest/surfacedata/ + #2.1 Parameters. Currently, this check is applied on "benzene", "ch4", "co", "ethane", "no", "no2", "nox", + "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", and "toluene". + + :param data: data array containing variables to check + :param minimum: minimum value, by default this should be 0 + + :return: corrected data """ chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] @@ -430,20 +496,38 @@ class DataPrep(object): return data def get_transposed_history(self) -> xr.DataArray: + """Return history. + + :return: history with dimensions datetime, window, Stations, variables. + """ return self.history.transpose("datetime", "window", "Stations", "variables").copy() def get_transposed_label(self) -> xr.DataArray: + """Return label. + + :return: label with dimensions datetime, window, Stations, variables. + """ return self.label.squeeze("Stations").transpose("datetime", "window").copy() def get_extremes_history(self) -> xr.DataArray: + """Return extremes history. + + :return: extremes history with dimensions datetime, window, Stations, variables. + """ return self.extremes_history.transpose("datetime", "window", "Stations", "variables").copy() - def get_extremes_label(self): + def get_extremes_label(self) -> xr.DataArray: + """Return extremes label. + + :return: extremes label with dimensions datetime, window, Stations, variables. + """ return self.extremes_label.squeeze("Stations").transpose("datetime", "window").copy() def multiply_extremes(self, extreme_values: num_or_list = 1., extremes_on_right_tail_only: bool = False, timedelta: Tuple[int, str] = (1, 'm')): """ + Multiply extremes. + This method extracts extreme values from self.labels which are defined in the argument extreme_values. One can also decide only to extract extremes on the right tail of the distribution. When extreme_values is a list of floats/ints all values larger (and smaller than negative extreme_values; extraction is performed in standardised @@ -458,7 +542,6 @@ class DataPrep(object): if True only extract values larger than extreme_values :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime """ - # check if labels or history is None if (self.label is None) or (self.history is None): logging.debug(f"{self.station} has `None' labels, skip multiply extremes") -- GitLab