diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index ec0f1f73282979a1e69945e1ad7f6817bdf3ba12..0be52e937b963a9c277992c57e00f9db282f48a5 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -336,7 +336,7 @@ class DataHandlerSingleStation(AbstractDataHandler): os.remove(file_name) if os.path.exists(meta_file): os.remove(meta_file) - data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, + data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, store_data_locally=store_data_locally, data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug(f"loaded new data") @@ -350,75 +350,16 @@ class DataHandlerSingleStation(AbstractDataHandler): except FileNotFoundError as e: logging.debug(e) logging.debug(f"load new data") - data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) + data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, + store_data_locally=store_data_locally, data_origin=data_origin, + time_dim=self.time_dim, target_dim=self.target_dim, + iter_dim=self.iter_dim) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) return data, meta - def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, - store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, - target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: - """ - Download data from TOAR database using the JOIN interface or load local era5 data. - - Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally - stored locally using given names for file and meta file. - - :param file_name: name of file to save data to (containing full path) - :param meta_file: name of the meta data file (also containing full path) - - :return: downloaded data and its meta data - """ - df_all = {} - df_era5, df_toar = None, None - meta_era5, meta_toar = None, None - if data_origin is not None: - era5_origin = filter_dict_by_value(data_origin, "era5", True) - era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) - toar_origin = filter_dict_by_value(data_origin, "era5", False) - toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) - assert len(era5_origin) + len(toar_origin) == len(data_origin) - assert len(era5_stats) + len(toar_stats) == len(statistics_per_var) - else: - era5_origin, toar_origin = None, None - era5_stats, toar_stats = statistics_per_var, statistics_per_var - - # load data - if era5_origin is not None and len(era5_stats) > 0: - # load era5 data - df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, - sampling=sampling, data_origin=era5_origin) - if toar_origin is None or len(toar_stats) > 0: - # load combined data from toar-data (v2 & v1) - df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, - sampling=sampling, data_origin=toar_origin) - - if df_era5 is None and df_toar is None: - raise data_sources.toar_data.EmptyQueryResult(f"No data available for era5 and toar-data") - - df = pd.concat([df_era5, df_toar], axis=1, sort=True) - if meta_era5 is not None and meta_toar is not None: - meta = meta_era5.combine_first(meta_toar) - else: - meta = meta_era5 if meta_era5 is not None else meta_toar - meta.loc["data_origin"] = str(data_origin) - meta.loc["statistics_per_var"] = str(statistics_per_var) - - df_all[station[0]] = df - # convert df_all to xarray - xarr = {k: xr.DataArray(v, dims=[time_dim, target_dim]) for k, v in df_all.items()} - xarr = xr.Dataset(xarr).to_array(dim=iter_dim) - if store_data_locally is True: - # save locally as nc/csv file - xarr.to_netcdf(path=file_name) - meta.to_csv(meta_file) - return xarr, meta - @staticmethod def check_station_meta(meta, station, data_origin, statistics_per_var): """ diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 69c9537b10ca583adf84480636680a99ab265a67..154f057622cf35d2af0810357e3f193307c6c499 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -22,7 +22,7 @@ import xarray as xr from mlair.data_handler.abstract_data_handler import AbstractDataHandler from mlair.helpers import remove_items, to_list, TimeTrackingWrapper -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult number = Union[float, int] diff --git a/mlair/helpers/data_sources/__init__.py b/mlair/helpers/data_sources/__init__.py index 6b753bc3afb961be65ff0f934ef4f0de08804a0b..21caa40ede2e76f9ea27e5f4e67feafe244a8f52 100644 --- a/mlair/helpers/data_sources/__init__.py +++ b/mlair/helpers/data_sources/__init__.py @@ -5,6 +5,7 @@ The module data_sources collects different data sources, namely ERA5, TOAR-Data """ __author__ = "Lukas Leufen" -__date__ = "2022-07-05" +__date__ = "2023-06-01" -from . import era5, join, toar_data, toar_data_v2 +from . import era5, join, toar_data, toar_data_v2, data_loader +from .data_loader import download_data diff --git a/mlair/helpers/data_sources/data_loader.py b/mlair/helpers/data_sources/data_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..8027e46dc0b3e03e8d0a2b93ddd8f3f3dbc67bf3 --- /dev/null +++ b/mlair/helpers/data_sources/data_loader.py @@ -0,0 +1,159 @@ +__author__ = 'Lukas Leufen' +__date__ = '2023-06-01' + +import logging +from typing import Dict, Union, List + +import requests +from requests.adapters import HTTPAdapter, Retry +# from requests.packages.urllib3.util.retry import Retry + +from mlair.helpers import filter_dict_by_value, select_from_dict, data_sources, TimeTracking +import pandas as pd +import xarray as xr + +DEFAULT_TIME_DIM = "datetime" +DEFAULT_TARGET_DIM = "variables" +DEFAULT_ITER_DIM = "Stations" + + +def download_data(file_name: str, meta_file: str, station, statistics_per_var, sampling, + store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, + target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: + """ + Download data from TOAR database using the JOIN interface or load local era5 data. + + Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally + stored locally using given names for file and meta file. + + :param file_name: name of file to save data to (containing full path) + :param meta_file: name of the meta data file (also containing full path) + + :return: downloaded data and its meta data + """ + df_all = {} + df_era5_local, df_toar = None, None + meta_era5_local, meta_toar = None, None + if data_origin is not None: + era5_local_origin = filter_dict_by_value(data_origin, "era5_local", True) + era5_local_stats = select_from_dict(statistics_per_var, era5_local_origin.keys()) + toar_origin = filter_dict_by_value(data_origin, "era5_local", False) + toar_stats = select_from_dict(statistics_per_var, era5_local_origin.keys(), filter_cond=False) + assert len(era5_local_origin) + len(toar_origin) == len(data_origin) + assert len(era5_local_stats) + len(toar_stats) == len(statistics_per_var) + else: + era5_local_origin, toar_origin = None, None + era5_local_stats, toar_stats = statistics_per_var, statistics_per_var + + # load data + if era5_local_origin is not None and len(era5_local_stats) > 0: + # load era5 data + df_era5_local, meta_era5_local = data_sources.era5.load_era5( + station_name=station, stat_var=era5_local_stats, sampling=sampling, data_origin=era5_local_origin) + if toar_origin is None or len(toar_stats) > 0: + # load combined data from toar-data (v2 & v1) + df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, + sampling=sampling, data_origin=toar_origin) + + if df_era5_local is None and df_toar is None: + raise EmptyQueryResult(f"No data available for era5_local and toar-data") + + df = pd.concat([df_era5_local, df_toar], axis=1, sort=True) + if meta_era5_local is not None and meta_toar is not None: + meta = meta_era5_local.combine_first(meta_toar) + else: + meta = meta_era5_local if meta_era5_local is not None else meta_toar + meta.loc["data_origin"] = str(data_origin) + meta.loc["statistics_per_var"] = str(statistics_per_var) + + df_all[station[0]] = df + # convert df_all to xarray + xarr = {k: xr.DataArray(v, dims=[time_dim, target_dim]) for k, v in df_all.items()} + xarr = xr.Dataset(xarr).to_array(dim=iter_dim) + if store_data_locally is True: + # save locally as nc/csv file + xarr.to_netcdf(path=file_name) + meta.to_csv(meta_file) + return xarr, meta + + +class EmptyQueryResult(Exception): + """Exception that get raised if a query to JOIN returns empty results.""" + + pass + + +def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: + """ + Download join data using requests framework. + + Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. + + :param opts: options to create the request url + :param headers: additional headers information like authorization, can be empty + :param as_json: extract response as json if true (default True) + + :return: requested data (either as list or dictionary) + """ + url = create_url(**opts) + try: + with TimeTracking(name=url): + response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) + if response.status_code == 200: + return response.json() if as_json is True else response.text + else: + raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") + except requests.exceptions.RetryError as e: + raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") + + +def correct_stat_name(stat: str) -> str: + """ + Map given statistic name to new namespace defined by mapping dict. + + Return given name stat if not element of mapping namespace. + + :param stat: namespace from JOIN server + + :return: stat mapped to local namespace + """ + mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} + return mapping.get(stat, stat) + + +def create_url(base: str, service: str, param_id: Union[str, int, None] = None, + **kwargs: Union[str, int, float, None]) -> str: + """ + Create a request url with given base url, service type and arbitrarily many additional keyword arguments. + + :param base: basic url of the rest service + :param service: service type, e.g. series, stats + :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs + :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' + + :return: combined url as string + """ + url = f"{base}" + if not url.endswith("/"): + url += "/" + if service is not None: + url = f"{url}{service}" + if not url.endswith("/"): + url += "/" + if param_id is not None: + url = f"{url}{param_id}" + if len(kwargs) > 0: + url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" + return url + + +def retries_session(max_retries=5): + retry_strategy = Retry(total=max_retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["HEAD", "GET", "OPTIONS"]) + adapter = HTTPAdapter(max_retries=retry_strategy) + http = requests.Session() + http.mount("https://", adapter) + http.mount("http://", adapter) + return http diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index 8eb7a03b2629db1d006e03fcc9d30b2af714c270..4a26fa2a2c8450cc0a20fd59ee3aa518f38e7ec2 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -12,7 +12,7 @@ from mlair import helpers from mlair.configuration.era5_settings import era5_settings from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint diff --git a/mlair/helpers/data_sources/join.py b/mlair/helpers/data_sources/join.py index a978b2712a83b21f3c1256b2bf0826da63bdda3a..3a97717e0104a21574618304c34d5162fb1907ed 100644 --- a/mlair/helpers/data_sources/join.py +++ b/mlair/helpers/data_sources/join.py @@ -10,7 +10,7 @@ import pandas as pd from mlair import helpers from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources import toar_data, toar_data_v2 +from mlair.helpers.data_sources import toar_data, toar_data_v2, data_loader # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' @@ -49,8 +49,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t missing_variables = set(stat_var).difference(vars_dict) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}" - raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in " - f"JOIN.") + raise data_loader.EmptyQueryResult( + f"No data found for variables {missing_variables} and options {options} in JOIN.") # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -71,7 +71,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t 'sampling': sampling, 'capture': 0, 'format': 'json'} # load data - data = toar_data.get_data(opts, headers) + data = data_loader.get_data(opts, headers) # adjust data format if given as list of list # no branch cover because this just happens when downloading hourly data using a secret token, not available @@ -80,7 +80,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t data = correct_data_format(data) # correct namespace of statistics - stat = toar_data.correct_stat_name(stat_var[var]) + stat = data_loader.correct_stat_name(stat_var[var]) # store data in pandas dataframe df = _save_to_pandas(df, data, stat, var) @@ -100,7 +100,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t meta.columns = station_name return df, meta else: - raise toar_data.EmptyQueryResult("No data found in JOIN.") + raise data_loader.EmptyQueryResult("No data found in JOIN.") def _correct_meta(meta): @@ -214,7 +214,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none, opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts, "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} - station_vars = toar_data.get_data(opts, headers) + station_vars = data_loader.get_data(opts, headers) logging.debug(f"{station_name}: {station_vars}") return _select_distinct_series(station_vars, data_origin, network_name) diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index 27522855cbe0f3c6f0b78d1598709a694fc7b862..1181318c0cb47e68d0cddfa333e4040599de8d26 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -1,86 +1,14 @@ __author__ = "Lukas Leufen" __date__ = "2022-07-05" - -from typing import Union, List, Dict - from . import join, toar_data_v2 import requests -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry import pandas as pd - - -class EmptyQueryResult(Exception): - """Exception that get raised if a query to JOIN returns empty results.""" - - pass - - -def create_url(base: str, service: str, param_id: Union[str, int, None] = None, - **kwargs: Union[str, int, float, None]) -> str: - """ - Create a request url with given base url, service type and arbitrarily many additional keyword arguments. - - :param base: basic url of the rest service - :param service: service type, e.g. series, stats - :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs - :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' - - :return: combined url as string - """ - url = f"{base}" - if not url.endswith("/"): - url += "/" - if service is not None: - url = f"{url}{service}" - if not url.endswith("/"): - url += "/" - if param_id is not None: - url = f"{url}{param_id}" - if len(kwargs) > 0: - url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" - return url - - -def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: - """ - Download join data using requests framework. - - Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. - - :param opts: options to create the request url - :param headers: additional headers information like authorization, can be empty - :param as_json: extract response as json if true (default True) - - :return: requested data (either as list or dictionary) - """ - url = create_url(**opts) - try: - response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) - if response.status_code == 200: - return response.json() if as_json is True else response.text - else: - raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") - except requests.exceptions.RetryError as e: - raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") - - -def retries_session(max_retries=3): - retry_strategy = Retry(total=max_retries, - backoff_factor=0.1, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=["HEAD", "GET", "OPTIONS"]) - adapter = HTTPAdapter(max_retries=retry_strategy) - http = requests.Session() - http.mount("https://", adapter) - http.mount("http://", adapter) - return http +from .data_loader import EmptyQueryResult def download_toar(station, toar_stats, sampling, data_origin): - try: # load data from toar-data (v2) df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) @@ -112,17 +40,3 @@ def merge_toar_join(df_toar, df_join, sampling): full_data = df_toar.reindex(full_time) full_data.update(df_join, overwrite=False) return full_data - - -def correct_stat_name(stat: str) -> str: - """ - Map given statistic name to new namespace defined by mapping dict. - - Return given name stat if not element of mapping namespace. - - :param stat: namespace from JOIN server - - :return: stat mapped to local namespace - """ - mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} - return mapping.get(stat, stat) diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index 0fa53a7eb23f11675eeef2c12a7d5dceec3c38ac..5d1cacc604f4288e48d12a72f8a24ba0d8b21fd1 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -13,8 +13,7 @@ from timezonefinder import TimezoneFinder from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list -from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data, correct_stat_name - +from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name str_or_none = Union[str, None] @@ -223,10 +222,14 @@ def select_timeseries_by_origin(toar_meta, var_origin): res = [] for origin in to_list(var_origin): for meta in toar_meta: - for roles in meta["roles"]: - if roles["contact"]["organisation"]["name"].lower() == origin.lower(): - res.append(meta) - break + if meta["data_origin"] == "instrument": + for roles in meta["roles"]: + if roles["contact"]["organisation"]["name"].lower() == origin.lower(): + res.append(meta) + break + elif meta["data_origin"].lower() == origin.lower(): + res.append(meta) + break return res diff --git a/mlair/keras_legacy/__init__.py b/mlair/keras_legacy/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index be7421f53fe59655480339493f584c8056b5715b..5710b63336b5c3e505363b90215a8cb631c3da22 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -18,7 +18,7 @@ import pandas as pd from mlair.data_handler import DataCollection, AbstractDataHandler from mlair.helpers import TimeTracking, to_list, tables, remove_items from mlair.configuration import path_config -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.testing import check_nested_equality from mlair.run_modules.run_environment import RunEnvironment diff --git a/test/test_helpers/test_data_sources/test_join.py b/test/test_helpers/test_data_sources/test_join.py index f9b12f5a7ff20e898695de0a0f035bed023674f2..edf69e203c69a1bdd0c99c8b98cebb1ea5680fb1 100644 --- a/test/test_helpers/test_data_sources/test_join.py +++ b/test/test_helpers/test_data_sources/test_join.py @@ -7,7 +7,7 @@ from mlair.helpers.data_sources.join import _save_to_pandas, _lower_list, _selec _select_distinct_data_origin, _select_distinct_network from mlair.configuration.join_settings import join_settings from mlair.helpers.testing import check_nested_equality -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult class TestDownloadJoin: diff --git a/test/test_helpers/test_data_sources/test_toar_data.py b/test/test_helpers/test_data_sources/test_toar_data.py index abaec10cc580b592d85d7dcc842616c67777f174..31af052ef9cfbe297f24ab167f8b06370a2239b7 100644 --- a/test/test_helpers/test_data_sources/test_toar_data.py +++ b/test/test_helpers/test_data_sources/test_toar_data.py @@ -1,5 +1,5 @@ from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources.toar_data import get_data, create_url, correct_stat_name +from mlair.helpers.data_sources.data_loader import get_data, correct_stat_name, create_url class TestGetData: