diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py new file mode 100644 index 0000000000000000000000000000000000000000..f1b5180fc00b19735c461faff72ce7b71cc90401 --- /dev/null +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -0,0 +1,68 @@ +__author__ = 'Lukas Leufen' +__date__ = '2020-11-05' + +from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation +from mlair.configuration import path_config + +import logging +import os + +import pandas as pd +import xarray as xr + + +class DataHandlerMixedSampling(DataHandlerSingleStation): + + def setup_samples(self): + """ + Setup samples. This method prepares and creates samples X, and labels Y. + """ + self.load_data() + self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) + self.set_inputs_and_targets() + if self.do_transformation is True: + self.call_transform() + self.make_samples() + + def load_data(self): + try: + self.read_data_from_disk() + except FileNotFoundError: + self.download_data() + self.load_data() + + def read_data_from_disk(self, source_name=""): + """ + Load data and meta data either from local disk (preferred) or download new data by using a custom download method. + + Data is either downloaded, if no local data is available or parameter overwrite_local_data is true. In both + cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not + set, it is assumed, that data should be saved locally. + """ + source_name = source_name if len(source_name) == 0 else f" from {source_name}" + path_config.check_path_and_create(self.path) + file_name = self._set_file_name() + meta_file = self._set_meta_file_name() + if self.overwrite_local_data is True: + logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}") + if os.path.exists(file_name): + os.remove(file_name) + if os.path.exists(meta_file): + os.remove(meta_file) + data, self.meta = self.download_data(file_name, meta_file) + logging.debug(f"loaded new data{source_name}") + else: + try: + logging.debug(f"try to load local data from: {file_name}") + data = xr.open_dataarray(file_name) + self.meta = pd.read_csv(meta_file, index_col=0) + self.check_station_meta() + logging.debug("loading finished") + except FileNotFoundError as e: + logging.debug(e) + logging.debug(f"load new data{source_name}") + data, self.meta = self.download_data(file_name, meta_file) + logging.debug("loading finished") + # create slices and check for negative concentration. + data = self._slice_prep(data) + self._data = self.check_for_negative_concentrations(data) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 460d1c100dadbc2aea5d43932e902cc080177b27..4cbd0c6a5f0cf856165242822b91a3fa9176a233 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -52,7 +52,7 @@ class DataHandlerSingleStation(AbstractDataHandler): min_length: int = 0, start=None, end=None, variables=None, **kwargs): super().__init__() # path, station, statistics_per_var, transformation, **kwargs) self.station = helpers.to_list(station) - self.path = os.path.abspath(data_path) + self.path = os.path.abspath(data_path) # ToDo: data_path could be a dict or list? self.statistics_per_var = statistics_per_var self.do_transformation = transformation is not None self.input_data, self.target_data = self.setup_transformation(transformation) @@ -141,7 +141,8 @@ class DataHandlerSingleStation(AbstractDataHandler): """ Setup samples. This method prepares and creates samples X, and labels Y. """ - self.load_data() + self.load_data(self.station, self.statistics_per_var, self.sampling, self.station_type, self.network, + self.store_data_locally) self.interpolate(dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() if self.do_transformation is True: @@ -160,7 +161,8 @@ class DataHandlerSingleStation(AbstractDataHandler): self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) - def read_data_from_disk(self, source_name=""): + def load_data(self, station, statistics_per_var, sampling, station_type=None, network=None, + store_data_locally=False): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -168,35 +170,41 @@ class DataHandlerSingleStation(AbstractDataHandler): cases, downloaded data is only stored locally if store_data_locally is not disabled. If this parameter is not set, it is assumed, that data should be saved locally. """ - source_name = source_name if len(source_name) == 0 else f" from {source_name}" check_path_and_create(self.path) - file_name = self._set_file_name() - meta_file = self._set_meta_file_name() + file_name = self._set_file_name(self.path, station, statistics_per_var) + meta_file = self._set_meta_file_name(self.path, station, statistics_per_var) if self.overwrite_local_data is True: - logging.debug(f"overwrite_local_data is true, therefore reload {file_name}{source_name}") + logging.debug(f"overwrite_local_data is true, therefore reload {file_name}") if os.path.exists(file_name): os.remove(file_name) if os.path.exists(meta_file): os.remove(meta_file) - data, self.meta = self.download_data(file_name, meta_file) - logging.debug(f"loaded new data{source_name}") + data, self.meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, + station_type=station_type, network=network, + store_data_locally=store_data_locally) + logging.debug(f"loaded new data") else: try: logging.debug(f"try to load local data from: {file_name}") data = xr.open_dataarray(file_name) self.meta = pd.read_csv(meta_file, index_col=0) - self.check_station_meta() + self.check_station_meta(station, station_type, network) logging.debug("loading finished") except FileNotFoundError as e: logging.debug(e) - logging.debug(f"load new data{source_name}") - data, self.meta = self.download_data(file_name, meta_file) + logging.debug(f"load new data") + data, self.meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, + station_type=station_type, network=network, + store_data_locally=store_data_locally) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data) self._data = self.check_for_negative_concentrations(data) - def download_data_from_join(self, file_name: str, meta_file: str) -> [xr.DataArray, pd.DataFrame]: + @staticmethod + def download_data_from_join(file_name: str, meta_file: str, station, statistics_per_var, sampling, + station_type=None, network=None, store_data_locally=True) -> [xr.DataArray, + pd.DataFrame]: """ Download data from TOAR database using the JOIN interface. @@ -209,36 +217,36 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df, meta = join.download_join(station_name=self.station, stat_var=self.statistics_per_var, - station_type=self.station_type, network_name=self.network, sampling=self.sampling) - df_all[self.station[0]] = df + df, meta = join.download_join(station_name=station, stat_var=statistics_per_var, station_type=station_type, + network_name=network, sampling=sampling) + df_all[station[0]] = df # convert df_all to xarray xarr = {k: xr.DataArray(v, dims=['datetime', 'variables']) for k, v in df_all.items()} xarr = xr.Dataset(xarr).to_array(dim='Stations') - if self.store_data_locally is True: + if store_data_locally is True: # save locally as nc/csv file xarr.to_netcdf(path=file_name) meta.to_csv(meta_file) return xarr, meta - def download_data(self, file_name, meta_file): - data, meta = self.download_data_from_join(file_name, meta_file) + def download_data(self, *args, **kwargs): + data, meta = self.download_data_from_join(*args, **kwargs) return data, meta - def check_station_meta(self): + def check_station_meta(self, station, station_type, network): """ Search for the entries in meta data and compare the value with the requested values. Will raise a FileNotFoundError if the values mismatch. """ - if self.station_type is not None: - check_dict = {"station_type": self.station_type, "network_name": self.network} + if station_type is not None: + check_dict = {"station_type": station_type, "network_name": network} for (k, v) in check_dict.items(): if v is None: continue - if self.meta.at[k, self.station[0]] != v: + if self.meta.at[k, station[0]] != v: logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " - f"{self.meta.at[k, self.station[0]]} (local). Raise FileNotFoundError to trigger new " + f"{self.meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " f"grapping from web.") raise FileNotFoundError @@ -303,13 +311,15 @@ class DataHandlerSingleStation(AbstractDataHandler): res.name = index_name return res - def _set_file_name(self): - all_vars = sorted(self.statistics_per_var.keys()) - return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}.nc") + @staticmethod + def _set_file_name(path, station, statistics_per_var): + all_vars = sorted(statistics_per_var.keys()) + return os.path.join(path, f"{''.join(station)}_{'_'.join(all_vars)}.nc") - def _set_meta_file_name(self): - all_vars = sorted(self.statistics_per_var.keys()) - return os.path.join(self.path, f"{''.join(self.station)}_{'_'.join(all_vars)}_meta.csv") + @staticmethod + def _set_meta_file_name(path, station, statistics_per_var): + all_vars = sorted(statistics_per_var.keys()) + return os.path.join(path, f"{''.join(station)}_{'_'.join(all_vars)}_meta.csv") def interpolate(self, dim: str, method: str = 'linear', limit: int = None, use_coordinate: Union[bool, str] = True, **kwargs): @@ -490,13 +500,6 @@ class DataHandlerSingleStation(AbstractDataHandler): else: raise NotImplementedError("Cannot handle this.") - def load_data(self): - try: - self.read_data_from_disk() - except FileNotFoundError: - self.download_data() - self.load_data() - def transform(self, data_class, dim: Union[str, int] = 0, transform_method: str = 'standardise', inverse: bool = False, mean=None, std=None, min=None, max=None) -> None: