From 90b0425116996e112c5d3c7907bc61a1ae2f5d73 Mon Sep 17 00:00:00 2001 From: lukas leufen Date: Thu, 30 Jun 2022 11:23:30 +0200 Subject: [PATCH 01/11] added trimm method as applied in #384 --- mlair/run_modules/post_processing.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 00d82f3..8c5080f 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -261,11 +261,17 @@ class PostProcessing(RunEnvironment): """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped.""" start_data = data.coords[dim].values[0] freq = {"daily": "1D", "hourly": "1H"}.get(sampling) - datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq)) + _ind = pd.date_range(start, end, freq=freq) # two steps required to include all hours of end interval + datetime_index = pd.DataFrame(index=pd.date_range(_ind.min(), _ind.max() + dt.timedelta(days=1), closed="left", + freq=freq)) t = data.sel({dim: start_data}, drop=True) res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords]) res = res.transpose(*data.dims) - res.loc[data.coords] = data + if data.shape == res.shape: + res.loc[data.coords] = data + else: + _d = data.sel({dim: slice(start, end)}) + res.loc[_d.coords] = _d return res def load_competitors(self, station_name: str) -> xr.DataArray: @@ -761,6 +767,7 @@ class PostProcessing(RunEnvironment): indicated by `station_name`. The name of the competitor is set in the `type` axis as indicator. This method will raise either a `FileNotFoundError` or `KeyError` if no competitor could be found for the given station. Either there is no file provided in the expected path or no forecast for given `competitor_name` in the forecast file. + Forecast is trimmed on interval start and end of test subset. :param station_name: name of the station to load data for :param competitor_name: name of the model @@ -769,10 +776,12 @@ class PostProcessing(RunEnvironment): path = os.path.join(self.competitor_path, competitor_name) file = os.path.join(path, f"forecasts_{station_name}_test.nc") with xr.open_dataarray(file) as da: - data = da.load() + data = da.load() forecast = data.sel(type=[self.forecast_indicator]) forecast.coords[self.model_type_dim] = [competitor_name] - return forecast + # limit forecast to time range of test subset + start, end = self.data_store.get("start", "test"), self.data_store.get("end", "test") + return self.create_full_time_dim(forecast, self.index_dim, self._sampling, start, end) def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray: """ -- GitLab From 218b6d9a835f0832a38b5c2fe39c2e491f020651 Mon Sep 17 00:00:00 2001 From: leufen1 Date: Fri, 1 Jul 2022 14:25:02 +0200 Subject: [PATCH 02/11] first implementation of toar-data-v2, can load data (but cannot process these for now), savepoint, #396 --- mlair/configuration/toar_data_v2_settings.py | 20 +++ .../data_handler_single_station.py | 2 + mlair/helpers/join.py | 15 +- mlair/helpers/toar_data_v2.py | 130 ++++++++++++++++++ 4 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 mlair/configuration/toar_data_v2_settings.py create mode 100644 mlair/helpers/toar_data_v2.py diff --git a/mlair/configuration/toar_data_v2_settings.py b/mlair/configuration/toar_data_v2_settings.py new file mode 100644 index 0000000..a8bb9f4 --- /dev/null +++ b/mlair/configuration/toar_data_v2_settings.py @@ -0,0 +1,20 @@ +"""Settings to access https://toar-data.fz-juelich.de""" +from typing import Tuple, Dict + + +def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]: + """ + Set url for toar-data and required headers. Headers information is not required for now. + + :param sampling: temporal resolution to access. + :return: Service url and optional headers + """ + if sampling == "daily": # pragma: no branch + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/" + headers = {} + elif sampling == "hourly" or sampling == "meta": + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/" + headers = {} + else: + raise NameError(f"Given sampling {sampling} is not supported, choose from either daily or hourly sampling.") + return TOAR_SERVICE_URL, headers diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 6d3407e..cb2c28a 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -22,6 +22,7 @@ from mlair.configuration import check_path_and_create from mlair import helpers from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5 from mlair.data_handler.abstract_data_handler import AbstractDataHandler +from mlair.helpers import toar_data_v2 # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -401,6 +402,7 @@ class DataHandlerSingleStation(AbstractDataHandler): data_origin=era5_origin) if join_origin is None or len(join_stats) > 0: # load join data + _ = toar_data_v2.download_toar(station, join_stats, sampling=sampling, data_origin=join_origin) df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type, network_name=network, sampling=sampling, data_origin=join_origin) df = pd.concat([df_era5, df_join], axis=1, sort=True) diff --git a/mlair/helpers/join.py b/mlair/helpers/join.py index 9c020b3..f7f204a 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/join.py @@ -125,7 +125,7 @@ def correct_data_format(data): return formatted -def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]: +def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: """ Download join data using requests framework. @@ -133,13 +133,14 @@ def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]: :param opts: options to create the request url :param headers: additional headers information like authorization, can be empty + :param as_json: extract response as json if true (default True) :return: requested data (either as list or dictionary) """ url = create_url(**opts) response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) if response.status_code == 200: - return response.json() + return response.json() if as_json is True else response.text else: raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") @@ -322,7 +323,7 @@ def _lower_list(args: List[str]) -> Iterator[str]: yield string.lower() -def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str: +def create_url(base: str, service: str, param_id: str = None, **kwargs: Union[str, int, float, None]) -> str: """ Create a request url with given base url, service type and arbitrarily many additional keyword arguments. @@ -334,7 +335,13 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) """ if not base.endswith("/"): base += "/" - url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" + url = f"{base}{service}" + if not url.endswith("/"): + url += "/" + if param_id is not None: + url = f"{url}{param_id}" + if len(kwargs) > 0: + url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" return url diff --git a/mlair/helpers/toar_data_v2.py b/mlair/helpers/toar_data_v2.py new file mode 100644 index 0000000..766dd13 --- /dev/null +++ b/mlair/helpers/toar_data_v2.py @@ -0,0 +1,130 @@ +"""Functions to access https://toar-data.fz-juelich.de/api/v2/""" +__author__ = 'Lukas Leufen' +__date__ = '2022-06-30' + + +import logging +from typing import Iterator, Union, List, Dict +from io import StringIO + +import pandas as pd + +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers import to_list +from mlair.helpers.join import EmptyQueryResult, get_data + + +str_or_none = Union[str, None] + + +def download_toar(station_name: Union[str, List[str]], stat_var: dict, + sampling: str = "daily", data_origin: Dict = None): + + # make sure station_name parameter is a list + station_name = to_list(station_name) + + # also ensure that given data_origin dict is no reference + if data_origin is None or len(data_origin) == 0: + data_origin = None + else: + data_origin = {k: v for (k, v) in data_origin.items()} + + # get data connection settings for meta + meta_url_base, headers = toar_data_v2_settings("meta") + + # load variables + var_meta = load_variables_information(stat_var, meta_url_base, headers) + + # load station meta + station_meta = load_station_information(station_name, meta_url_base, headers) + + # load series information + timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin) + + # # correct stat_var values if data is not aggregated (hourly) + # if sampling == "hourly": + # stat_var = {key: "values" for key in stat_var.keys()} + + logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA") + # get data connection settings for data + data_url_base, headers = toar_data_v2_settings(sampling) + + for var, meta in timeseries_meta.items(): + logging.debug(f"load {var}") + + data_var = load_timeseries_data(meta, data_url_base, headers) + + return + + +def load_timeseries_data(timeseries_meta, url_base, headers): + coll = [] + for meta in timeseries_meta: + series_id = meta["id"] + # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} + opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv"} + res = get_data(opts, headers, as_json=False) + data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, + infer_datetime_format=True) + coll.append(data["value"]) + return coll + + +def load_station_information(station_name: List[str], url_base: str, headers: Dict): + # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} + opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} + return get_data(opts, headers) + + +def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict, + data_origin: Dict = None) -> [Dict, Dict]: + timeseries_id_dict = {} + missing = [] + for var, meta in var_meta.items(): + timeseries_id_dict[var] = [] + opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]} + res = get_data(opts, headers) + if len(res) == 0: + missing.append((var, meta)) + # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).") + if data_origin is not None: + var_origin = data_origin[var] + timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin) + # if len(timeseries_id_dict[var]) == 0: + # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) " + # f"and timeseries origin {var_origin}.") + if data_origin is None or len(timeseries_id_dict[var]) == 0: + timeseries_id_dict[var] = select_timeseries_by_order(res) + if len(missing) > 0: + missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing]) + raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + f"({station_meta['codes'][0]}) and variables {missing}.") + return timeseries_id_dict + + +def select_timeseries_by_order(toar_meta): + order_dict = {meta["order"]: meta for meta in toar_meta} + res = [order_dict[order] for order in sorted(order_dict.keys())] + return res + + +def select_timeseries_by_origin(toar_meta, var_origin): + res = [] + for origin in to_list(var_origin): + for meta in toar_meta: + for roles in meta["roles"]: + if roles["contact"]["organisation"]["name"].lower() == origin.lower(): + res.append(meta) + break + return res + + +def load_variables_information(var_dict, url_base, headers): + var_meta_dict = {} + for var in var_dict.keys(): + # opts = {"base": url_base, "service": f"variables/{var}"} + opts = {"base": url_base, "service": f"variables", "param_id": var} + var_meta_dict[var] = get_data(opts, headers) + return var_meta_dict -- GitLab From 2fff7bd86137bafa14b1d8f7287ecd128b34c2ff Mon Sep 17 00:00:00 2001 From: leufen1 Date: Tue, 5 Jul 2022 11:37:49 +0200 Subject: [PATCH 03/11] running toar-data v2 downloads, toar-data v1 (JOIN) is also updated to use same parameters --- .../data_handler_single_station.py | 28 +++--- mlair/helpers/era5.py | 6 +- mlair/helpers/join.py | 84 +++++++++++++++--- mlair/helpers/toar_data_v2.py | 86 +++++++++++++++++-- 4 files changed, 164 insertions(+), 40 deletions(-) diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index cb2c28a..690a44f 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -382,31 +382,31 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_join = None, None - meta_era5, meta_join = None, None + df_era5, df_toar = None, None + meta_era5, meta_toar = None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) - join_origin = filter_dict_by_value(data_origin, "era5", False) - join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) - assert len(era5_origin) + len(join_origin) == len(data_origin) - assert len(era5_stats) + len(join_stats) == len(statistics_per_var) + toar_origin = filter_dict_by_value(data_origin, "era5", False) + toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) + assert len(era5_origin) + len(toar_origin) == len(data_origin) + assert len(era5_stats) + len(toar_stats) == len(statistics_per_var) else: - era5_origin, join_origin = None, None - era5_stats, join_stats = statistics_per_var, statistics_per_var + era5_origin, toar_origin = None, None + era5_stats, toar_stats = statistics_per_var, statistics_per_var # load data if era5_origin is not None and len(era5_stats) > 0: # load era5 data df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, data_origin=era5_origin) - if join_origin is None or len(join_stats) > 0: + if toar_origin is None or len(toar_stats) > 0: # load join data - _ = toar_data_v2.download_toar(station, join_stats, sampling=sampling, data_origin=join_origin) - df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type, - network_name=network, sampling=sampling, data_origin=join_origin) - df = pd.concat([df_era5, df_join], axis=1, sort=True) - meta = meta_era5 if meta_era5 is not None else meta_join + # df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) + df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=toar_origin) + df = pd.concat([df_era5, df_toar], axis=1, sort=True) + meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) df_all[station[0]] = df diff --git a/mlair/helpers/era5.py b/mlair/helpers/era5.py index 8cf967a..e0fb074 100644 --- a/mlair/helpers/era5.py +++ b/mlair/helpers/era5.py @@ -36,9 +36,9 @@ def load_era5(station_name, stat_var, sampling, data_origin): meta = load_meta_data(station_name, None, None, join_url_base, headers) # sel data for station using sel method nearest - data = xr.open_mfdataset(os.path.join(data_path, file_names)) - station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) - station_data = station_dask.to_array().T.compute() + with xr.open_mfdataset(os.path.join(data_path, file_names)) as data: + station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) + station_data = station_dask.to_array().T.compute() # transform data and meta to pandas station_data = station_data.to_pandas() diff --git a/mlair/helpers/join.py b/mlair/helpers/join.py index f7f204a..409a154 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/join.py @@ -4,7 +4,7 @@ __date__ = '2019-10-16' import datetime as dt import logging -from typing import Iterator, Union, List, Dict +from typing import Iterator, Union, List, Dict, Tuple import pandas as pd import requests @@ -25,15 +25,13 @@ class EmptyQueryResult(Exception): def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, - network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, - pd.DataFrame]: + sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]: """ Read data from JOIN/TOAR. :param station_name: Station name e.g. DEBY122 :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean' :param station_type: set the station type like "traffic" or "background", can be none - :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid origins are "REA" for reanalysis data and "" (empty string) for observational data. @@ -43,11 +41,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # make sure station_name parameter is a list station_name = helpers.to_list(station_name) - # also ensure that given data_origin dict is no reference - if data_origin is None or len(data_origin) == 0: - data_origin = None - else: - data_origin = {k: v for (k, v) in data_origin.items()} + # split network and origin information + data_origin, network_name = split_network_and_origin(data_origin) # get data connection settings join_url_base, headers = join_settings(sampling) @@ -105,6 +100,49 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t raise EmptyQueryResult("No data found in JOIN.") +def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: + """ + Split given dict into network and data origin. + + Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which + uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1). + """ + if origin_network_dict is None or len(origin_network_dict) == 0: + data_origin, network = None, None + else: + data_origin = {} + network = {} + for k, v in origin_network_dict.items(): + network[k] = [] + for _network in helpers.to_list(v): + if _network.lower() == "EEA".lower(): + network[k].append("AIRBASE") + elif _network.lower() != "REA".lower(): + network[k].append(_network) + if "REA" in v: + data_origin[k] = "REA" + else: + data_origin[k] = "" + network[k] = filter_network(network[k]) + return data_origin, network + + +def filter_network(network: list) -> Union[list, None]: + """ + Filter given list of networks. + + :param network: list of various network names (can contain duplicates) + :return: sorted list with unique entries + """ + sorted_network = [] + for v in list(filter(lambda x: x != "", network)): + if v not in sorted_network: + sorted_network.append(v) + if len(sorted_network) == 0: + sorted_network = None + return sorted_network + + def correct_data_format(data): """ Transform to the standard data format. @@ -190,7 +228,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none, :return: all available series for requested station stored in an dictionary with parameter name (variable) as key and the series id as value. """ - network_name_opts = network_name if network_name is None else ",".join(helpers.to_list(network_name)) + network_name_opts = _create_network_name_opts(network_name) opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, "network_name": network_name_opts, "as_dict": "true", "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} @@ -199,6 +237,21 @@ def load_series_information(station_name: List[str], station_type: str_or_none, return _select_distinct_series(station_vars, data_origin, network_name) +def _create_network_name_opts(network_name): + if network_name is None: + network_name_opts = network_name + elif isinstance(network_name, list): + network_name_opts = ",".join(helpers.to_list(network_name)) + elif isinstance(network_name, dict): + _network = [] + for v in network_name.values(): + _network.extend(helpers.to_list(v)) + network_name_opts = ",".join(filter(lambda x: x is not None, set(_network))) + else: + raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.") + return network_name_opts + + def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) -> \ [Dict, Dict]: """ @@ -207,7 +260,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_ data_origin = {} if data_origin is None else data_origin selected, data_origin = _select_distinct_data_origin(vars, data_origin) - network_name = [] if network_name is None else helpers.to_list(network_name) + network_name = [] if network_name is None else network_name selected = _select_distinct_network(selected, network_name) # extract id @@ -215,7 +268,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_ return selected, data_origin -def _select_distinct_network(vars: dict, network_name: list) -> dict: +def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict: """ Select distinct series regarding network name. The order the network names are provided in parameter `network_name` indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is @@ -228,15 +281,18 @@ def _select_distinct_network(vars: dict, network_name: list) -> dict: indicating to use always first candidate for each variable. :return: dictionary with single series reference for each variable """ + if isinstance(network_name, (list, str)): + network_name = {var: helpers.to_list(network_name) for var in vars.keys()} selected = {} for var, series in vars.items(): res = [] - for network in network_name: + network_list = network_name.get(var, []) or [] + for network in network_list: res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series))) if len(res) > 0: # use first match which has the highest priority selected[var] = res[0] else: - if len(network_name) == 0: # just print message which network is used if none is provided + if len(network_list) == 0: # just print message which network is used if none is provided selected[var] = series[0] logging.info(f"Could not find a valid match for variable {var} and networks {network_name}! " f"Therefore, use first answer from JOIN: {series[0]}") diff --git a/mlair/helpers/toar_data_v2.py b/mlair/helpers/toar_data_v2.py index 766dd13..5cc67b6 100644 --- a/mlair/helpers/toar_data_v2.py +++ b/mlair/helpers/toar_data_v2.py @@ -19,6 +19,19 @@ str_or_none = Union[str, None] def download_toar(station_name: Union[str, List[str]], stat_var: dict, sampling: str = "daily", data_origin: Dict = None): + """ + Download data from https://toar-data.fz-juelich.de/api/v2/ + + Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is + given, this method tries to load time series for this origin. In case no origin is provided, this method loads data + with the highest priority according to toar-data's order parameter. + + :param station_name: + :param stat_var: + :param sampling: + :param data_origin: + :return: + """ # make sure station_name parameter is a list station_name = to_list(station_name) @@ -49,24 +62,79 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, # get data connection settings for data data_url_base, headers = toar_data_v2_settings(sampling) + data_dict = {} for var, meta in timeseries_meta.items(): logging.debug(f"load {var}") - - data_var = load_timeseries_data(meta, data_url_base, headers) - - return - - -def load_timeseries_data(timeseries_meta, url_base, headers): + meta, opts = prepare_meta(meta, sampling, stat_var, var) + data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0] + data_dict[var] = data_var + data = pd.DataFrame.from_dict(data_dict) + + meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name + return data, meta + + +def prepare_meta(meta, sampling, stat_var, var): + meta = meta[0] + opts = {} + if sampling == "daily": + opts["timeseries_id"] = meta.pop("id") + meta["id"] = None + opts["names"] = stat_var[var] + opts["sampling"] = sampling + return [meta], opts + + +def combine_meta_data(station_meta, timeseries_meta): + meta = {} + for k, v in station_meta.items(): + print(k) + if k == "codes": + meta[k] = v[0] + elif k in ["coordinates", "additional_metadata", "globalmeta"]: + for _key, _val in v.items(): + print(_key) + if _key == "lng": + meta["lon"] = _val + else: + meta[_key] = _val + elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: + continue + else: + meta[k] = v + for var, var_meta in timeseries_meta.items(): + print(var) + for k, v in var_meta.items(): + print(k) + if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: + continue + elif k == "roles": + for _key, _val in v[0]["contact"]["organisation"].items(): + new_k = f"{var}_organisation_{_key}" + meta[new_k] = _val + elif k == "variable": + for _key, _val in v.items(): + new_k = f"{var}_{_key}" + meta[new_k] = _val + else: + new_k = f"{var}_{k}" + meta[new_k] = v + return meta + + +def load_timeseries_data(timeseries_meta, url_base, opts, headers): coll = [] for meta in timeseries_meta: series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} - opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv"} + opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} res = get_data(opts, headers, as_json=False) data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, infer_datetime_format=True) - coll.append(data["value"]) + data = data["value"].rename(meta["variable"]["name"]) + coll.append(data) return coll -- GitLab From f31196a0b5c21b71c2a5b4849a090ad45598e728 Mon Sep 17 00:00:00 2001 From: leufen1 Date: Tue, 5 Jul 2022 15:30:42 +0200 Subject: [PATCH 04/11] update tests for join --- mlair/helpers/join.py | 7 +++-- test/test_helpers/test_join.py | 48 +++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/mlair/helpers/join.py b/mlair/helpers/join.py index 409a154..6d38887 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/join.py @@ -247,6 +247,7 @@ def _create_network_name_opts(network_name): for v in network_name.values(): _network.extend(helpers.to_list(v)) network_name_opts = ",".join(filter(lambda x: x is not None, set(_network))) + network_name_opts = None if len(network_name_opts) == 0 else network_name_opts else: raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.") return network_name_opts @@ -286,7 +287,7 @@ def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dic selected = {} for var, series in vars.items(): res = [] - network_list = network_name.get(var, []) or [] + network_list = helpers.to_list(network_name.get(var, []) or []) for network in network_list: res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series))) if len(res) > 0: # use first match which has the highest priority @@ -379,12 +380,14 @@ def _lower_list(args: List[str]) -> Iterator[str]: yield string.lower() -def create_url(base: str, service: str, param_id: str = None, **kwargs: Union[str, int, float, None]) -> str: +def create_url(base: str, service: str, param_id: Union[str, int, None] = None, + **kwargs: Union[str, int, float, None]) -> str: """ Create a request url with given base url, service type and arbitrarily many additional keyword arguments. :param base: basic url of the rest service :param service: service type, e.g. series, stats + :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' :return: combined url as string diff --git a/test/test_helpers/test_join.py b/test/test_helpers/test_join.py index c309b26..9a79d45 100644 --- a/test/test_helpers/test_join.py +++ b/test/test_helpers/test_join.py @@ -30,7 +30,7 @@ class TestDownloadJoin: with pytest.raises(EmptyQueryResult) as e: download_join("DEBW107", {"o3": "dma8eu", "o10": "maximum"}, "background", data_origin={"o10": ""}) assert e.value.args[-1] == "No data found for variables {'o10'} and options station=['DEBW107'], " \ - "type=background, network=None, origin={'o10': ''} in JOIN." + "type=background, network={'o10': None}, origin={'o10': ''} in JOIN." class TestCorrectDataFormat: @@ -160,7 +160,8 @@ class TestSelectDistinctNetwork: 'parameter_label': 'PRESS-REA-MIUB', 'parameter_attribute': 'REA'}} assert check_nested_equality(res, expected) is True - message = "Could not find a valid match for variable %s and networks []! Therefore, use first answer from JOIN:" + message = "Could not find a valid match for variable %s and networks {'no2': [], 'o3': [], 'cloudcover': [], " \ + "'temp': [], 'press': []}! Therefore, use first answer from JOIN:" assert message % "no2" in caplog.messages[0] assert message % "o3" in caplog.messages[1] assert message % "cloudcover" in caplog.messages[2] @@ -185,13 +186,16 @@ class TestSelectDistinctNetwork: def test_single_network_given_no_match(self, vars): with pytest.raises(ValueError) as e: # AIRBASE not avail for all variables _select_distinct_network(vars, ["AIRBASE"]) - assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable " \ - "no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['AIRBASE'], 'o3': " \ + "['AIRBASE'], 'cloudcover': ['AIRBASE'], 'temp': ['AIRBASE'], 'press': ['AIRBASE']" \ + "} and variable no2 as only following networks are available in JOIN: ['UBA']" with pytest.raises(ValueError) as e: # both requested networks are not available for all variables _select_distinct_network(vars, ["LUBW", "EMEP"]) - assert e.value.args[-1] == "Cannot find a valid match for requested networks ['LUBW', 'EMEP'] and variable " \ - "no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['LUBW', 'EMEP'], 'o3': " \ + "['LUBW', 'EMEP'], 'cloudcover': ['LUBW', 'EMEP'], 'temp': ['LUBW', 'EMEP'], " \ + "'press': ['LUBW', 'EMEP']} and variable no2 as only following networks are " \ + "available in JOIN: ['UBA']" def test_multiple_networks_given(self, vars): res = _select_distinct_network(vars, ["UBA", "AIRBASE"]) @@ -222,6 +226,22 @@ class TestSelectDistinctNetwork: 'parameter_name': 'press', 'parameter_label': 'PRESS', 'parameter_attribute': 'REA'}} assert check_nested_equality(res, expected) is True + def test_multiple_networks_given_by_dict(self, vars): + res = _select_distinct_network(vars, {"no2": "UBA", "o3": ["UBA", "AIRBASE"], "temp": ["AIRBASE", "UBA"], + "press": ["AIRBASE", "UBA"]}) + expected = { + "no2": {'id': 16686, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'no2', + 'parameter_label': 'NO2', 'parameter_attribute': ''}, + "o3": {'id': 16687, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'o3', + 'parameter_label': 'O3', 'parameter_attribute': ''}, + "cloudcover": {'id': 54036, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'cloudcover', + 'parameter_label': 'CLOUDCOVER', 'parameter_attribute': 'REA'}, + "temp": {'id': 88491, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp', + 'parameter_label': 'TEMP-REA-MIUB', 'parameter_attribute': 'REA'}, + "press": {'id': 26692, 'network_name': 'AIRBASE', 'station_id': 'DENW053', + 'parameter_name': 'press', 'parameter_label': 'PRESS', 'parameter_attribute': 'REA'}} + assert check_nested_equality(res, expected) is True + class TestSelectDistinctSeries: @@ -274,8 +294,9 @@ class TestSelectDistinctSeries: def test_network_not_available(self, vars): with pytest.raises(ValueError) as e: _select_distinct_series(vars, network_name="AIRBASE") - assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable " \ - "no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['AIRBASE'], 'o3': " \ + "['AIRBASE'], 'cloudcover': ['AIRBASE'], 'temp': ['AIRBASE'], 'press': ['AIRBASE']" \ + "} and variable no2 as only following networks are available in JOIN: ['UBA']" def test_different_network_and_origin(self, vars): origin = {"no2": "test", "temp": "", "cloudcover": "REA"} @@ -349,7 +370,7 @@ class TestCreateUrl: def test_minimal_args_given(self): url = create_url("www.base.edu", "testingservice") - assert url == "www.base.edu/testingservice/?" + assert url == "www.base.edu/testingservice/" def test_given_kwargs(self): url = create_url("www.base2.edu/", "testingservice", mood="happy", confidence=0.98) @@ -362,3 +383,12 @@ class TestCreateUrl: def test_none_kwargs(self): url = create_url("www.base2.edu/", "testingservice", mood="sad", happiness=None, stress_factor=100) assert url == "www.base2.edu/testingservice/?mood=sad&stress_factor=100" + + def test_param_id(self): + url = create_url("www.base.edu", "testingservice", param_id="2001") + assert url == "www.base.edu/testingservice/2001" + + def test_param_id_kwargs(self): + url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", happiness=None, stress_factor=100) + assert url == "www.base.edu/testingservice/?2001&mood=sad&stress_factor=100" + -- GitLab From ee34c882abfa37fd9f63345aa6f6d1808515141e Mon Sep 17 00:00:00 2001 From: leufen1 Date: Wed, 6 Jul 2022 10:45:30 +0200 Subject: [PATCH 05/11] restructured data loading modules --- .../data_handler_single_station.py | 31 ++++-- mlair/data_handler/default_data_handler.py | 2 +- mlair/helpers/data_sources/__init__.py | 10 ++ mlair/helpers/{ => data_sources}/era5.py | 29 +++-- mlair/helpers/{ => data_sources}/join.py | 103 +++++------------- mlair/helpers/data_sources/toar_data.py | 89 +++++++++++++++ .../{ => data_sources}/toar_data_v2.py | 9 +- mlair/run_modules/pre_processing.py | 2 +- .../{ => test_data_sources}/test_join.py | 58 ++-------- .../test_data_sources/test_toar_data.py | 40 +++++++ 10 files changed, 219 insertions(+), 154 deletions(-) create mode 100644 mlair/helpers/data_sources/__init__.py rename mlair/helpers/{ => data_sources}/era5.py (63%) rename mlair/helpers/{ => data_sources}/join.py (83%) create mode 100644 mlair/helpers/data_sources/toar_data.py rename mlair/helpers/{ => data_sources}/toar_data_v2.py (96%) rename test/test_helpers/{ => test_data_sources}/test_join.py (88%) create mode 100644 test/test_helpers/test_data_sources/test_toar_data.py diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 690a44f..516fab7 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -20,9 +20,9 @@ import xarray as xr from mlair.configuration import check_path_and_create from mlair import helpers -from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5 +from mlair.helpers import statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict from mlair.data_handler.abstract_data_handler import AbstractDataHandler -from mlair.helpers import toar_data_v2 +from mlair.helpers import data_sources # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -382,8 +382,8 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_toar = None, None - meta_era5, meta_toar = None, None + df_era5, df_toar, df_join = None, None, None + meta_era5, meta_toar, meta_join = None, None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) @@ -398,13 +398,24 @@ class DataHandlerSingleStation(AbstractDataHandler): # load data if era5_origin is not None and len(era5_stats) > 0: # load era5 data - df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, - data_origin=era5_origin) + df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, + sampling=sampling, data_origin=era5_origin) if toar_origin is None or len(toar_stats) > 0: - # load join data - # df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) - df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - station_type=station_type, data_origin=toar_origin) + # load combined ata from toar-data (v2 & v1) + df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, + sampling=sampling, data_origin=toar_origin, + station_type=station_type) + + # # load data from toar-data (v2) + # df_toar, meta_toar = toar_data.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) + # + # # load join data (toar-data v1) + # df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + # station_type=station_type, data_origin=toar_origin) + # + # # fill-up toar-data with join data + # a = 1 + df = pd.concat([df_era5, df_toar], axis=1, sort=True) meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 300e043..8ba78f1 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -22,7 +22,7 @@ import xarray as xr from mlair.data_handler.abstract_data_handler import AbstractDataHandler from mlair.helpers import remove_items, to_list, TimeTrackingWrapper -from mlair.helpers.join import EmptyQueryResult +from mlair.helpers.data_sources.toar_data import EmptyQueryResult number = Union[float, int] diff --git a/mlair/helpers/data_sources/__init__.py b/mlair/helpers/data_sources/__init__.py new file mode 100644 index 0000000..6b753bc --- /dev/null +++ b/mlair/helpers/data_sources/__init__.py @@ -0,0 +1,10 @@ +""" +Data Sources. + +The module data_sources collects different data sources, namely ERA5, TOAR-Data v1 (JOIN), and TOAR-Data v2 +""" + +__author__ = "Lukas Leufen" +__date__ = "2022-07-05" + +from . import era5, join, toar_data, toar_data_v2 diff --git a/mlair/helpers/era5.py b/mlair/helpers/data_sources/era5.py similarity index 63% rename from mlair/helpers/era5.py rename to mlair/helpers/data_sources/era5.py index e0fb074..a4f60af 100644 --- a/mlair/helpers/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -5,14 +5,14 @@ __date__ = "2022-06-09" import logging import os -import numpy as np import pandas as pd import xarray as xr from mlair import helpers from mlair.configuration.era5_settings import era5_settings -from mlair.configuration.join_settings import join_settings -from mlair.helpers.join import load_meta_data, EmptyQueryResult +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data +from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint @@ -30,14 +30,15 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") - # get data connection settings - # load series information (lat/lon) from join database - join_url_base, headers = join_settings() - meta = load_meta_data(station_name, None, None, join_url_base, headers) + # load station meta using toar-data v2 API + meta_url_base, headers = toar_data_v2_settings("meta") + station_meta = load_station_information(station_name, meta_url_base, headers) # sel data for station using sel method nearest + logging.info(f"load data for {station_meta['codes'][0]} from ERA5") with xr.open_mfdataset(os.path.join(data_path, file_names)) as data: - station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) + lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] + station_dask = data.sel(lon=lon, lat=lat, method="nearest", drop=True) station_data = station_dask.to_array().T.compute() # transform data and meta to pandas @@ -55,10 +56,20 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: station_data = station_data[stat_var] - meta = pd.DataFrame.from_dict(meta, orient="index", columns=station_name) + variable_meta = _emulate_meta_data(station_data) + meta = combine_meta_data(station_meta, variable_meta) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name return station_data, meta +def _emulate_meta_data(station_data): + general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} + roles_meta = {"roles": [{"contact": {"organisation": {"name": "ERA5", "longname": "ECMWF"}}}]} + variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.columns} + return variable_meta + + def _rename_era5_variables(era5_names): mapper = {"SP": "press", "U10M": "u", "V10M": "v", "T2M": "temp", "D2M": "dew", "BLH": "pblheight", "TCC": "cloudcover", "RHw": "relhum"} diff --git a/mlair/helpers/join.py b/mlair/helpers/data_sources/join.py similarity index 83% rename from mlair/helpers/join.py rename to mlair/helpers/data_sources/join.py index 6d38887..0ae1af1 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/data_sources/join.py @@ -7,23 +7,16 @@ import logging from typing import Iterator, Union, List, Dict, Tuple import pandas as pd -import requests -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry from mlair import helpers from mlair.configuration.join_settings import join_settings +from mlair.helpers.data_sources import toar_data + # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' str_or_none = Union[str, None] -class EmptyQueryResult(Exception): - """Exception that get raised if a query to JOIN returns empty results.""" - - pass - - def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]: """ @@ -49,14 +42,15 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # load series information vars_dict, data_origin = load_series_information(station_name, station_type, network_name, join_url_base, headers, - data_origin) + data_origin, stat_var) # check if all requested variables are available if set(stat_var).issubset(vars_dict) is False: missing_variables = set(stat_var).difference(vars_dict) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}" - raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") + raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in " + f"JOIN.") # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -76,7 +70,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t 'sampling': sampling, 'capture': 0, 'format': 'json'} # load data - data = get_data(opts, headers) + data = toar_data.get_data(opts, headers) # adjust data format if given as list of list # no branch cover because this just happens when downloading hourly data using a secret token, not available @@ -97,7 +91,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t meta.columns = station_name return df, meta else: - raise EmptyQueryResult("No data found in JOIN.") + raise toar_data.EmptyQueryResult("No data found in JOIN.") def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: @@ -163,38 +157,6 @@ def correct_data_format(data): return formatted -def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: - """ - Download join data using requests framework. - - Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. - - :param opts: options to create the request url - :param headers: additional headers information like authorization, can be empty - :param as_json: extract response as json if true (default True) - - :return: requested data (either as list or dictionary) - """ - url = create_url(**opts) - response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) - if response.status_code == 200: - return response.json() if as_json is True else response.text - else: - raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") - - -def retries_session(max_retries=3): - retry_strategy = Retry(total=max_retries, - backoff_factor=0.1, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=["HEAD", "GET", "OPTIONS"]) - adapter = HTTPAdapter(max_retries=retry_strategy) - http = requests.Session() - http.mount("https://", adapter) - http.mount("http://", adapter) - return http - - def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none, join_url_base: str, headers: Dict) -> [Dict, Dict]: opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, @@ -210,11 +172,11 @@ def load_meta_data(station_name: List[str], station_type: str_or_none, network_n "google_resolution,station_comments,station_max_population_density_5km"} if network_name is None: opts["columns"] = opts["columns"].replace(",network_name", "") - return get_data(opts, headers)[-1] + return toar_data.get_data(opts, headers)[-1] def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict, data_origin: Dict = None) -> [Dict, Dict]: + join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]: """ List all series ids that are available for given station id and network name. @@ -229,14 +191,23 @@ def load_series_information(station_name: List[str], station_type: str_or_none, and the series id as value. """ network_name_opts = _create_network_name_opts(network_name) + parameter_name_opts = _create_parameter_name_opts(stat_var) opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name_opts, "as_dict": "true", + "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts, "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} - station_vars = get_data(opts, headers) + station_vars = toar_data.get_data(opts, headers) logging.debug(f"{station_name}: {station_vars}") return _select_distinct_series(station_vars, data_origin, network_name) +def _create_parameter_name_opts(stat_var): + if stat_var is None: + parameter_name_opts = None + else: + parameter_name_opts = ",".join(stat_var.keys()) + return parameter_name_opts + + def _create_network_name_opts(network_name): if network_name is None: network_name_opts = network_name @@ -253,8 +224,8 @@ def _create_network_name_opts(network_name): return network_name_opts -def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) -> \ - [Dict, Dict]: +def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) \ + -> [Dict, Dict]: """ Select distinct series ids for all variables. Also check if a parameter is from REA or not. """ @@ -295,10 +266,10 @@ def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dic else: if len(network_list) == 0: # just print message which network is used if none is provided selected[var] = series[0] - logging.info(f"Could not find a valid match for variable {var} and networks {network_name}! " - f"Therefore, use first answer from JOIN: {series[0]}") + logging.info(f"Could not find a valid match for variable {var} and networks {network_name.get(var, [])}" + f"! Therefore, use first answer from JOIN: {series[0]}") else: # raise error if network name is provided but no match could be found - raise ValueError(f"Cannot find a valid match for requested networks {network_name} and " + raise ValueError(f"Cannot find a valid match for requested networks {network_name.get(var, [])} and " f"variable {var} as only following networks are available in JOIN: " f"{list(map(lambda x: x['network_name'], series))}") return selected @@ -380,30 +351,6 @@ def _lower_list(args: List[str]) -> Iterator[str]: yield string.lower() -def create_url(base: str, service: str, param_id: Union[str, int, None] = None, - **kwargs: Union[str, int, float, None]) -> str: - """ - Create a request url with given base url, service type and arbitrarily many additional keyword arguments. - - :param base: basic url of the rest service - :param service: service type, e.g. series, stats - :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs - :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' - - :return: combined url as string - """ - if not base.endswith("/"): - base += "/" - url = f"{base}{service}" - if not url.endswith("/"): - url += "/" - if param_id is not None: - url = f"{url}{param_id}" - if len(kwargs) > 0: - url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" - return url - - if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py new file mode 100644 index 0000000..70d6223 --- /dev/null +++ b/mlair/helpers/data_sources/toar_data.py @@ -0,0 +1,89 @@ +__author__ = "Lukas Leufen" +__date__ = "2022-07-05" + + +from typing import Union, List, Dict + +from . import join, toar_data_v2 + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + + +class EmptyQueryResult(Exception): + """Exception that get raised if a query to JOIN returns empty results.""" + + pass + + +def create_url(base: str, service: str, param_id: Union[str, int, None] = None, + **kwargs: Union[str, int, float, None]) -> str: + """ + Create a request url with given base url, service type and arbitrarily many additional keyword arguments. + + :param base: basic url of the rest service + :param service: service type, e.g. series, stats + :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs + :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' + + :return: combined url as string + """ + if not base.endswith("/"): + base += "/" + url = f"{base}{service}" + if not url.endswith("/"): + url += "/" + if param_id is not None: + url = f"{url}{param_id}" + if len(kwargs) > 0: + url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" + return url + + +def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: + """ + Download join data using requests framework. + + Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. + + :param opts: options to create the request url + :param headers: additional headers information like authorization, can be empty + :param as_json: extract response as json if true (default True) + + :return: requested data (either as list or dictionary) + """ + url = create_url(**opts) + response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) + if response.status_code == 200: + return response.json() if as_json is True else response.text + else: + raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") + + +def retries_session(max_retries=3): + retry_strategy = Retry(total=max_retries, + backoff_factor=0.1, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["HEAD", "GET", "OPTIONS"]) + adapter = HTTPAdapter(max_retries=retry_strategy) + http = requests.Session() + http.mount("https://", adapter) + http.mount("http://", adapter) + return http + + +def download_toar(station, toar_stats, sampling, data_origin, station_type=None): + + # load data from toar-data (v2) + df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) + + # load join data (toar-data v1) + df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=data_origin) + + return df_toar + + +def merge_toar_join(df_toar, df_join): + start_date = min([df_toar.index.min(), df_join.index.min()]) \ No newline at end of file diff --git a/mlair/helpers/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py similarity index 96% rename from mlair/helpers/toar_data_v2.py rename to mlair/helpers/data_sources/toar_data_v2.py index 5cc67b6..bf85dd9 100644 --- a/mlair/helpers/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -4,14 +4,14 @@ __date__ = '2022-06-30' import logging -from typing import Iterator, Union, List, Dict +from typing import Union, List, Dict from io import StringIO import pandas as pd from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list -from mlair.helpers.join import EmptyQueryResult, get_data +from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data str_or_none = Union[str, None] @@ -90,12 +90,10 @@ def prepare_meta(meta, sampling, stat_var, var): def combine_meta_data(station_meta, timeseries_meta): meta = {} for k, v in station_meta.items(): - print(k) if k == "codes": meta[k] = v[0] elif k in ["coordinates", "additional_metadata", "globalmeta"]: for _key, _val in v.items(): - print(_key) if _key == "lng": meta["lon"] = _val else: @@ -105,9 +103,7 @@ def combine_meta_data(station_meta, timeseries_meta): else: meta[k] = v for var, var_meta in timeseries_meta.items(): - print(var) for k, v in var_meta.items(): - print(k) if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: continue elif k == "roles": @@ -192,7 +188,6 @@ def select_timeseries_by_origin(toar_meta, var_origin): def load_variables_information(var_dict, url_base, headers): var_meta_dict = {} for var in var_dict.keys(): - # opts = {"base": url_base, "service": f"variables/{var}"} opts = {"base": url_base, "service": f"variables", "param_id": var} var_meta_dict[var] = get_data(opts, headers) return var_meta_dict diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 0e416ac..de70002 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -18,7 +18,7 @@ import pandas as pd from mlair.data_handler import DataCollection, AbstractDataHandler from mlair.helpers import TimeTracking, to_list, tables from mlair.configuration import path_config -from mlair.helpers.join import EmptyQueryResult +from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.run_modules.run_environment import RunEnvironment diff --git a/test/test_helpers/test_join.py b/test/test_helpers/test_data_sources/test_join.py similarity index 88% rename from test/test_helpers/test_join.py rename to test/test_helpers/test_data_sources/test_join.py index 9a79d45..0a9715f 100644 --- a/test/test_helpers/test_join.py +++ b/test/test_helpers/test_data_sources/test_join.py @@ -2,11 +2,12 @@ from typing import Iterable import pytest -from mlair.helpers.join import * -from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series, \ +from mlair.helpers.data_sources.join import * +from mlair.helpers.data_sources.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series, \ _select_distinct_data_origin, _select_distinct_network from mlair.configuration.join_settings import join_settings from mlair.helpers.testing import check_nested_equality +from mlair.helpers.data_sources.toar_data import EmptyQueryResult class TestDownloadJoin: @@ -46,14 +47,6 @@ class TestCorrectDataFormat: "metadata": {"station": "test_station_001", "author": "ME", "success": True}} -class TestGetData: - - def test(self): - opts = {"base": join_settings()[0], "service": "series", "station_id": 'DEBW107', "network_name": "UBA", - "parameter_name": "o3,no2"} - assert get_data(opts, headers={}) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] - - class TestLoadSeriesInformation: def test_standard_query(self): @@ -160,8 +153,7 @@ class TestSelectDistinctNetwork: 'parameter_label': 'PRESS-REA-MIUB', 'parameter_attribute': 'REA'}} assert check_nested_equality(res, expected) is True - message = "Could not find a valid match for variable %s and networks {'no2': [], 'o3': [], 'cloudcover': [], " \ - "'temp': [], 'press': []}! Therefore, use first answer from JOIN:" + message = "Could not find a valid match for variable %s and networks []! Therefore, use first answer from JOIN:" assert message % "no2" in caplog.messages[0] assert message % "o3" in caplog.messages[1] assert message % "cloudcover" in caplog.messages[2] @@ -186,16 +178,13 @@ class TestSelectDistinctNetwork: def test_single_network_given_no_match(self, vars): with pytest.raises(ValueError) as e: # AIRBASE not avail for all variables _select_distinct_network(vars, ["AIRBASE"]) - assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['AIRBASE'], 'o3': " \ - "['AIRBASE'], 'cloudcover': ['AIRBASE'], 'temp': ['AIRBASE'], 'press': ['AIRBASE']" \ - "} and variable no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable no2 as " \ + "only following networks are available in JOIN: ['UBA']" with pytest.raises(ValueError) as e: # both requested networks are not available for all variables _select_distinct_network(vars, ["LUBW", "EMEP"]) - assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['LUBW', 'EMEP'], 'o3': " \ - "['LUBW', 'EMEP'], 'cloudcover': ['LUBW', 'EMEP'], 'temp': ['LUBW', 'EMEP'], " \ - "'press': ['LUBW', 'EMEP']} and variable no2 as only following networks are " \ - "available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks ['LUBW', 'EMEP'] and variable " \ + "no2 as only following networks are available in JOIN: ['UBA']" def test_multiple_networks_given(self, vars): res = _select_distinct_network(vars, ["UBA", "AIRBASE"]) @@ -294,9 +283,8 @@ class TestSelectDistinctSeries: def test_network_not_available(self, vars): with pytest.raises(ValueError) as e: _select_distinct_series(vars, network_name="AIRBASE") - assert e.value.args[-1] == "Cannot find a valid match for requested networks {'no2': ['AIRBASE'], 'o3': " \ - "['AIRBASE'], 'cloudcover': ['AIRBASE'], 'temp': ['AIRBASE'], 'press': ['AIRBASE']" \ - "} and variable no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable no2 as " \ + "only following networks are available in JOIN: ['UBA']" def test_different_network_and_origin(self, vars): origin = {"no2": "test", "temp": "", "cloudcover": "REA"} @@ -366,29 +354,3 @@ class TestLowerList: assert list(list_iterator) == ["capitalised", "already_small", "uppercase", "verystrange"] -class TestCreateUrl: - - def test_minimal_args_given(self): - url = create_url("www.base.edu", "testingservice") - assert url == "www.base.edu/testingservice/" - - def test_given_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="happy", confidence=0.98) - assert url == "www.base2.edu/testingservice/?mood=happy&confidence=0.98" - - def test_single_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="undefined") - assert url == "www.base2.edu/testingservice/?mood=undefined" - - def test_none_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="sad", happiness=None, stress_factor=100) - assert url == "www.base2.edu/testingservice/?mood=sad&stress_factor=100" - - def test_param_id(self): - url = create_url("www.base.edu", "testingservice", param_id="2001") - assert url == "www.base.edu/testingservice/2001" - - def test_param_id_kwargs(self): - url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", happiness=None, stress_factor=100) - assert url == "www.base.edu/testingservice/?2001&mood=sad&stress_factor=100" - diff --git a/test/test_helpers/test_data_sources/test_toar_data.py b/test/test_helpers/test_data_sources/test_toar_data.py new file mode 100644 index 0000000..277a637 --- /dev/null +++ b/test/test_helpers/test_data_sources/test_toar_data.py @@ -0,0 +1,40 @@ +from mlair.configuration.join_settings import join_settings +from mlair.helpers.data_sources.toar_data import get_data, create_url + + +class TestGetData: + + def test(self): + opts = {"base": join_settings()[0], "service": "series", "station_id": 'DEBW107', "network_name": "UBA", + "parameter_name": "o3,no2"} + assert get_data(opts, headers={}) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] + + +class TestCreateUrl: + + def test_minimal_args_given(self): + url = create_url("www.base.edu", "testingservice") + assert url == "www.base.edu/testingservice/" + + def test_given_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="happy", confidence=0.98) + assert url == "www.base2.edu/testingservice/?mood=happy&confidence=0.98" + + def test_single_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="undefined") + assert url == "www.base2.edu/testingservice/?mood=undefined" + + def test_none_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="sad", happiness=None, stress_factor=100) + assert url == "www.base2.edu/testingservice/?mood=sad&stress_factor=100" + + def test_param_id(self): + url = create_url("www.base.edu", "testingservice", param_id="2001") + assert url == "www.base.edu/testingservice/2001" + + def test_param_id_kwargs(self): + url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", happiness=None, stress_factor=100) + assert url == "www.base.edu/testingservice/2001?mood=sad&stress_factor=100" + + url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", series_id=222) + assert url == "www.base.edu/testingservice/2001?mood=sad&series_id=222" -- GitLab From 50e6d0bffd33f035bf536142b363f6cee8827bf9 Mon Sep 17 00:00:00 2001 From: leufen1 Date: Thu, 7 Jul 2022 15:32:27 +0200 Subject: [PATCH 06/11] removed network parameter, toar_data loads from both v2 and v1 and combines data, data is now in local time (without DST), data handler now raises error when no data intersection is available --- HPC_setup/requirements_HDFML_additionals.txt | 1 + HPC_setup/requirements_JUWELS_additionals.txt | 1 + mlair/configuration/defaults.py | 1 - .../data_handler_mixed_sampling.py | 7 +- .../data_handler_single_station.py | 85 +++++++++---------- .../data_handler/data_handler_with_filter.py | 4 +- mlair/data_handler/default_data_handler.py | 6 +- mlair/helpers/data_sources/era5.py | 5 +- mlair/helpers/data_sources/join.py | 59 ++++++++----- mlair/helpers/data_sources/toar_data.py | 33 ++++--- mlair/helpers/data_sources/toar_data_v2.py | 70 ++++++++++++--- mlair/run_modules/experiment_setup.py | 2 +- mlair/run_modules/post_processing.py | 1 + mlair/run_modules/pre_processing.py | 7 +- requirements.txt | 1 + 15 files changed, 175 insertions(+), 108 deletions(-) diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index ebfac3c..6102da7 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index ebfac3c..6102da7 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index b630261..f3e0496 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -9,7 +9,6 @@ DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} -DEFAULT_NETWORK = "AIRBASE" DEFAULT_STATION_TYPE = "background" DEFAULT_VARIABLES = DEFAULT_VAR_ALL_DICT.keys() DEFAULT_START = "1997-01-01" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index c26f97c..140bc8c 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -63,8 +63,8 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.station_type, self.store_data_locally, self.data_origin, self.start, + self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) @@ -147,8 +147,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - start, end) + self.station_type, self.store_data_locally, self.data_origin, start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) return data diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 516fab7..e1bcd62 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -40,7 +40,6 @@ class DataHandlerSingleStation(AbstractDataHandler): is 0. """ DEFAULT_STATION_TYPE = "background" - DEFAULT_NETWORK = "AIRBASE" DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} @@ -59,16 +58,16 @@ class DataHandlerSingleStation(AbstractDataHandler): chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - _hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim", - "target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", - "window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"] + _hash = ["station", "statistics_per_var", "data_origin", "station_type", "sampling", "target_dim", "target_var", + "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time", + "interpolation_limit", "interpolation_method", "variables", "window_history_end"] def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE, - network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, - target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, - iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, - window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, - window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME, + sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, + target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM, + window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, + window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, window_history_end=DEFAULT_WINDOW_HISTORY_END, + window_lead_time=DEFAULT_WINDOW_LEAD_TIME, interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT, interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, @@ -89,7 +88,6 @@ class DataHandlerSingleStation(AbstractDataHandler): self._transformation = self.setup_transformation(transformation) self.station_type = station_type - self.network = network self.sampling = sampling self.target_dim = target_dim self.target_var = target_var @@ -141,9 +139,8 @@ class DataHandlerSingleStation(AbstractDataHandler): return self._data.shape, self.get_X().shape, self.get_Y().shape def __repr__(self): - return f"StationPrep(station={self.station}, data_path='{self.path}', " \ - f"statistics_per_var={self.statistics_per_var}, " \ - f"station_type='{self.station_type}', network='{self.network}', " \ + return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \ + f"statistics_per_var={self.statistics_per_var}, station_type='{self.station_type}', " \ f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \ f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \ f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \ @@ -170,8 +167,12 @@ class DataHandlerSingleStation(AbstractDataHandler): return self.get_transposed_label() def get_coordinates(self): - coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) - return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)] + try: + coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) + coords = coords.rename(index={"station_lon": "lon", "station_lat": "lat"}) + except KeyError: + coords = self.meta.loc[["lon", "lat"]].astype(float) + return coords.to_dict()[str(self)] def call_transform(self, inverse=False): opts_input = self._transformation[0] @@ -302,7 +303,7 @@ class DataHandlerSingleStation(AbstractDataHandler): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, + self.station_type, self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit, sampling=self.sampling) @@ -321,8 +322,8 @@ class DataHandlerSingleStation(AbstractDataHandler): self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) - def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None, - store_data_locally=False, data_origin: Dict = None, start=None, end=None): + def load_data(self, path, station, statistics_per_var, sampling, station_type=None, store_data_locally=False, + data_origin: Dict = None, start=None, end=None): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -340,9 +341,8 @@ class DataHandlerSingleStation(AbstractDataHandler): if os.path.exists(meta_file): os.remove(meta_file) data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, + station_type=station_type, store_data_locally=store_data_locally, + data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug(f"loaded new data") else: @@ -350,26 +350,24 @@ class DataHandlerSingleStation(AbstractDataHandler): logging.debug(f"try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) - self.check_station_meta(meta, station, station_type, network, data_origin) + self.check_station_meta(meta, station, station_type, data_origin) logging.debug("loading finished") except FileNotFoundError as e: logging.debug(e) logging.debug(f"load new data") data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) + station_type=station_type, store_data_locally=store_data_locally, + data_origin=data_origin, time_dim=self.time_dim, + target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) return data, meta - def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, - station_type=None, network=None, store_data_locally=True, data_origin: Dict = None, - time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \ - -> [xr.DataArray, pd.DataFrame]: + def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, station_type=None, + store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, + target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: """ Download data from TOAR database using the JOIN interface or load local era5 data. @@ -382,8 +380,8 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_toar, df_join = None, None, None - meta_era5, meta_toar, meta_join = None, None, None + df_era5, df_toar = None, None + meta_era5, meta_toar = None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) @@ -401,23 +399,16 @@ class DataHandlerSingleStation(AbstractDataHandler): df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, data_origin=era5_origin) if toar_origin is None or len(toar_stats) > 0: - # load combined ata from toar-data (v2 & v1) + # load combined data from toar-data (v2 & v1) df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, sampling=sampling, data_origin=toar_origin, station_type=station_type) - # # load data from toar-data (v2) - # df_toar, meta_toar = toar_data.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) - # - # # load join data (toar-data v1) - # df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - # station_type=station_type, data_origin=toar_origin) - # - # # fill-up toar-data with join data - # a = 1 - df = pd.concat([df_era5, df_toar], axis=1, sort=True) - meta = meta_era5 if meta_era5 is not None else meta_toar + if meta_era5 is not None and meta_toar is not None: + meta = meta_era5.combine_first(meta_toar) + else: + meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) df_all[station[0]] = df @@ -431,16 +422,16 @@ class DataHandlerSingleStation(AbstractDataHandler): return xarr, meta @staticmethod - def check_station_meta(meta, station, station_type, network, data_origin): + def check_station_meta(meta, station, station_type, data_origin): """ Search for the entries in meta data and compare the value with the requested values. Will raise a FileNotFoundError if the values mismatch. """ if station_type is not None: - check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)} + check_dict = {"station_type": station_type, "type": station_type, "data_origin": str(data_origin)} for (k, v) in check_dict.items(): - if v is None or k not in meta: + if v is None or k not in meta.index: continue if meta.at[k, station[0]] != v: logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 47ccc55..6fbdc38 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -68,8 +68,8 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.station_type, self.store_data_locally, self.data_origin, self.start, + self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 8ba78f1..ae46ad9 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -168,7 +168,7 @@ class DefaultDataHandler(AbstractDataHandler): dim = self.time_dim intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original)) if len(intersect) < max(self.min_length, 1): - X, Y = None, None + raise ValueError(f"There is no intersection of X.") else: X = list(map(lambda x: x.sel({dim: intersect}), X_original)) Y = Y_original.sel({dim: intersect}) @@ -205,10 +205,6 @@ class DefaultDataHandler(AbstractDataHandler): if True only extract values larger than extreme_values :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime """ - # check if X or Y is None - if (self._X is None) or (self._Y is None): - logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes") - return if extreme_values is None: logging.debug(f"No extreme values given, skip multiply extremes") self._X_extreme, self._Y_extreme = self._X, self._Y diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index a4f60af..cd569f3 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -11,7 +11,7 @@ import xarray as xr from mlair import helpers from mlair.configuration.era5_settings import era5_settings from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings -from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint @@ -56,6 +56,9 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: station_data = station_data[stat_var] + # convert to local timezone + station_data = correct_timezone(station_data, station_meta, sampling) + variable_meta = _emulate_meta_data(station_data) meta = combine_meta_data(station_meta, variable_meta) meta = pd.DataFrame.from_dict(meta, orient='index') diff --git a/mlair/helpers/data_sources/join.py b/mlair/helpers/data_sources/join.py index 0ae1af1..df3b358 100644 --- a/mlair/helpers/data_sources/join.py +++ b/mlair/helpers/data_sources/join.py @@ -10,7 +10,7 @@ import pandas as pd from mlair import helpers from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources import toar_data +from mlair.helpers.data_sources import toar_data, toar_data_v2 # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' @@ -59,6 +59,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # download all variables with given statistic data = None df = None + meta = {} logging.info(f"load data for {station_name[0]} from JOIN") for var in _lower_list(sorted(vars_dict.keys())): if var in stat_var.keys(): @@ -83,17 +84,53 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # store data in pandas dataframe df = _save_to_pandas(df, data, stat, var) + meta[var] = _correct_meta(data["metadata"]) logging.debug('finished: {}'.format(var)) if data: - meta = pd.DataFrame.from_dict(data['metadata'], orient='index') + # load station meta using toar-data v2 API and convert to local timezone + meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta") + station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers) + df = toar_data_v2.correct_timezone(df, station_meta, sampling) + + # create meta data + meta = toar_data_v2.combine_meta_data(station_meta, meta) + meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name return df, meta else: raise toar_data.EmptyQueryResult("No data found in JOIN.") +def _correct_meta(meta): + meta_out = {} + for k, v in meta.items(): + if k.startswith("station"): + _k = k.split("_", 1)[1] + _d = meta_out.get("station", {}) + _d[_k] = v + meta_out["station"] = _d + elif k.startswith("parameter"): + _k = k.split("_", 1)[1] + _d = meta_out.get("variable", {}) + _d[_k] = v + meta_out["variable"] = _d + elif k == "network_name": + if v == "AIRBASE": + _d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"} + elif v == "UBA": + _d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"} + else: + _d = {"name": v} + meta_out["roles"] = [{"contact": {"organisation": _d}}] + elif k in ["google_resolution", "numid"]: + continue + else: + meta_out[k] = v + return meta_out + + def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: """ Split given dict into network and data origin. @@ -157,24 +194,6 @@ def correct_data_format(data): return formatted -def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict) -> [Dict, Dict]: - opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name, "as_dict": "true", - "columns": "station_id,network_name,station_local_id,station_type,station_type_of_area,station_category," - "station_name,station_country,station_state,station_lon,station_lat,station_alt," - "station_timezone,station_nightlight_5km,station_climatic_zone,station_wheat_production," - "station_rice_production,station_nox_emissions,station_omi_no2_column,station_toar_category," - "station_htap_region,station_reported_alt,station_alt_flag,station_coordinate_status," - "station_google_alt,station_etopo_alt,station_etopo_min_alt_5km,station_etopo_relative_alt," - "station_dominant_landcover,station_landcover_description,station_max_nightlight_25km," - "station_max_population_density_25km,station_nightlight_1km,station_population_density," - "google_resolution,station_comments,station_max_population_density_5km"} - if network_name is None: - opts["columns"] = opts["columns"].replace(",network_name", "") - return toar_data.get_data(opts, headers)[-1] - - def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]: """ diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index 70d6223..b01fb3f 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -9,6 +9,7 @@ from . import join, toar_data_v2 import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry +import pandas as pd class EmptyQueryResult(Exception): @@ -29,9 +30,11 @@ def create_url(base: str, service: str, param_id: Union[str, int, None] = None, :return: combined url as string """ - if not base.endswith("/"): - base += "/" - url = f"{base}{service}" + url = f"{base}" + if not url.endswith("/"): + url += "/" + if service is not None: + url = f"{url}{service}" if not url.endswith("/"): url += "/" if param_id is not None: @@ -79,11 +82,19 @@ def download_toar(station, toar_stats, sampling, data_origin, station_type=None df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) # load join data (toar-data v1) - df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - station_type=station_type, data_origin=data_origin) - - return df_toar - - -def merge_toar_join(df_toar, df_join): - start_date = min([df_toar.index.min(), df_join.index.min()]) \ No newline at end of file + df_join, _ = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=data_origin) + + # merge both data sources with priority on toar-data v2 + df_merged = merge_toar_join(df_toar, df_join, sampling) + return df_merged, meta_toar + + +def merge_toar_join(df_toar, df_join, sampling): + start_date = min([df_toar.index.min(), df_join.index.min()]) + end_date = max([df_toar.index.max(), df_join.index.max()]) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = df_toar.reindex(full_time) + full_data.update(df_join, overwrite=False) + return full_data diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index bf85dd9..5f8e831 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -8,6 +8,8 @@ from typing import Union, List, Dict from io import StringIO import pandas as pd +import pytz +from timezonefinder import TimezoneFinder from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list @@ -65,10 +67,13 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, data_dict = {} for var, meta in timeseries_meta.items(): logging.debug(f"load {var}") - meta, opts = prepare_meta(meta, sampling, stat_var, var) - data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0] - data_dict[var] = data_var + meta_and_opts = prepare_meta(meta, sampling, stat_var, var) + data_var = [] + for var_meta, opts in meta_and_opts: + data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) + data_dict[var] = merge_data(*data_var, sampling=sampling) data = pd.DataFrame.from_dict(data_dict) + data = correct_timezone(data, station_meta, sampling) meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) meta = pd.DataFrame.from_dict(meta, orient='index') @@ -76,15 +81,52 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, return data, meta +def merge_data(*args, sampling="hourly"): + start_date = min(map(lambda x: x.index.min(), args)) + end_date = max(map(lambda x: x.index.max(), args)) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = args[0].reindex(full_time) + if not isinstance(full_data, pd.DataFrame): + full_data = full_data.to_frame() + for d in args[1:]: + full_data.update(d, overwrite=False) + return full_data.squeeze() + + +def correct_timezone(data, meta, sampling): + """ + Extract timezone information and convert data index to this timezone. + + Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all + other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. + Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. + """ + if sampling == "hourly": + tz_info = meta.get("timezone", "UTC") + try: + tz = pytz.timezone(tz_info) + except pytz.exceptions.UnknownTimeZoneError as e: + lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] + tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) + index = data.index + index = index.tz_localize(None) + utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) + data.index = index + utc_offset + return data + + def prepare_meta(meta, sampling, stat_var, var): - meta = meta[0] - opts = {} - if sampling == "daily": - opts["timeseries_id"] = meta.pop("id") - meta["id"] = None - opts["names"] = stat_var[var] - opts["sampling"] = sampling - return [meta], opts + out = [] + for m in meta: + opts = {} + if sampling == "daily": + opts["timeseries_id"] = m.pop("id") + m["id"] = None + opts["names"] = stat_var[var] + opts["sampling"] = sampling + out.append(([m], opts)) + return out def combine_meta_data(station_meta, timeseries_meta): @@ -120,16 +162,18 @@ def combine_meta_data(station_meta, timeseries_meta): return meta -def load_timeseries_data(timeseries_meta, url_base, opts, headers): +def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): coll = [] for meta in timeseries_meta: series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} + if sampling != "hourly": + opts["service"] = None res = get_data(opts, headers, as_json=False) data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, infer_datetime_format=True) - data = data["value"].rename(meta["variable"]["name"]) + data = data[opts.get("names", "value")].rename(meta["variable"]["name"]) coll.append(data) return coll diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index d807db1..5e7efde 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -10,7 +10,7 @@ from dill.source import getsource from mlair.configuration import path_config from mlair import helpers -from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_NETWORK, DEFAULT_STATION_TYPE, \ +from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_STATION_TYPE, \ DEFAULT_START, DEFAULT_END, DEFAULT_WINDOW_HISTORY_SIZE, DEFAULT_OVERWRITE_LOCAL_DATA, \ DEFAULT_HPC_LOGIN_LIST, DEFAULT_HPC_HOST_LIST, DEFAULT_CREATE_NEW_MODEL, DEFAULT_TRAIN_MODEL, \ DEFAULT_FRACTION_OF_TRAINING, DEFAULT_EXTREME_VALUES, DEFAULT_EXTREMES_ON_RIGHT_TAIL_ONLY, DEFAULT_PERMUTE_DATA, \ diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 8c5080f..7dc61f8 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -13,6 +13,7 @@ from typing import Dict, Tuple, Union, List, Callable import numpy as np import pandas as pd import xarray as xr +import datetime as dt from mlair.configuration import path_config from mlair.data_handler import Bootstraps, KerasIterator diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index de70002..7fb272f 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -114,8 +114,8 @@ class PreProcessing(RunEnvironment): +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ """ - meta_cols = ['station_name', 'station_lon', 'station_lat', 'station_alt'] - meta_round = ["station_lon", "station_lat", "station_alt"] + meta_cols = ["name", "lat", "lon", "alt", "country", "state", "type", "type_of_area", "toar1_category"] + meta_round = ["lat", "lon", "alt"] precision = 4 path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(path) @@ -402,8 +402,9 @@ def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_pat def f_proc_create_info_df(data, meta_cols): station_name = str(data.id_class) + meta = data.id_class.meta res = {"station_name": station_name, "Y_shape": data.get_Y()[0].shape[0], - "meta": data.id_class.meta.loc[meta_cols].values.flatten()} + "meta": meta.reindex(meta_cols).values.flatten()} return res diff --git a/requirements.txt b/requirements.txt index 3afc17b..353c009 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ six==1.15.0 statsmodels==0.12.2 tabulate==0.8.9 tensorflow==2.5.0 +timezonefinder==5.2.0 toolz==0.11.2 typing_extensions==3.7.4.3 wget==3.2 -- GitLab From 70ed13c6abcedd455089225260b5b78ac65aea7f Mon Sep 17 00:00:00 2001 From: leufen1 Date: Thu, 7 Jul 2022 16:34:06 +0200 Subject: [PATCH 07/11] fix tests --- mlair/helpers/data_sources/toar_data_v2.py | 3 ++- test/test_configuration/test_defaults.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index 5f8e831..a592d2d 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -14,6 +14,7 @@ from timezonefinder import TimezoneFinder from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data +from mlair.helpers.data_sources import join str_or_none = Union[str, None] @@ -173,7 +174,7 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): res = get_data(opts, headers, as_json=False) data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, infer_datetime_format=True) - data = data[opts.get("names", "value")].rename(meta["variable"]["name"]) + data = data[join._correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) coll.append(data) return coll diff --git a/test/test_configuration/test_defaults.py b/test/test_configuration/test_defaults.py index 07a5aa2..b465902 100644 --- a/test/test_configuration/test_defaults.py +++ b/test/test_configuration/test_defaults.py @@ -31,7 +31,6 @@ class TestAllDefaults: 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} - assert DEFAULT_NETWORK == "AIRBASE" assert DEFAULT_STATION_TYPE == "background" assert DEFAULT_VARIABLES == DEFAULT_VAR_ALL_DICT.keys() assert DEFAULT_START == "1997-01-01" -- GitLab From 47a44e94eee854b4a3041c885436d978efadd721 Mon Sep 17 00:00:00 2001 From: leufen1 Date: Thu, 7 Jul 2022 19:07:53 +0200 Subject: [PATCH 08/11] try to prevent break down on download toar --- mlair/helpers/data_sources/toar_data.py | 27 +++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index b01fb3f..fc4b8e7 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -78,16 +78,27 @@ def retries_session(max_retries=3): def download_toar(station, toar_stats, sampling, data_origin, station_type=None): - # load data from toar-data (v2) - df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) - - # load join data (toar-data v1) - df_join, _ = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - station_type=station_type, data_origin=data_origin) + try: + # load data from toar-data (v2) + df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) + except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): + df_toar, meta_toar = None, None + + try: + # load join data (toar-data v1) + df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=data_origin) + except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): + df_join, meta_join = None, None # merge both data sources with priority on toar-data v2 - df_merged = merge_toar_join(df_toar, df_join, sampling) - return df_merged, meta_toar + if df_toar is not None and df_join is not None: + df_merged = merge_toar_join(df_toar, df_join, sampling) + meta_merged = meta_toar + else: + df_merged = df_toar if df_toar is not None else df_join + meta_merged = meta_toar if df_toar is not None else meta_join + return df_merged, meta_merged def merge_toar_join(df_toar, df_join, sampling): -- GitLab From afd914212b692f1fb92d05db824f29b38db19d62 Mon Sep 17 00:00:00 2001 From: leufen1 Date: Fri, 8 Jul 2022 11:21:32 +0200 Subject: [PATCH 09/11] remove station_type parameter, add stats_var to meta check --- .../data_handler_mixed_sampling.py | 5 +- .../data_handler_single_station.py | 64 +++++++++---------- .../data_handler/data_handler_with_filter.py | 3 +- mlair/helpers/data_sources/toar_data.py | 4 +- 4 files changed, 34 insertions(+), 42 deletions(-) diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index 140bc8c..eaa6a21 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -63,8 +63,7 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.store_data_locally, self.data_origin, self.start, - self.end) + self.store_data_locally, self.data_origin, self.start, self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) @@ -147,7 +146,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.store_data_locally, self.data_origin, start, end) + self.store_data_locally, self.data_origin, start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) return data diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index e1bcd62..d23c9f2 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -39,7 +39,6 @@ class DataHandlerSingleStation(AbstractDataHandler): indicates that not all values up to t0 are used, a positive values indicates usage of values at t>t0. Default is 0. """ - DEFAULT_STATION_TYPE = "background" DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} @@ -58,16 +57,15 @@ class DataHandlerSingleStation(AbstractDataHandler): chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - _hash = ["station", "statistics_per_var", "data_origin", "station_type", "sampling", "target_dim", "target_var", - "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time", + _hash = ["station", "statistics_per_var", "data_origin", "sampling", "target_dim", "target_var", "time_dim", + "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"] - def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE, - sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, - target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM, - window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, - window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, window_history_end=DEFAULT_WINDOW_HISTORY_END, - window_lead_time=DEFAULT_WINDOW_LEAD_TIME, + def __init__(self, station, data_path, statistics_per_var=None, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, + target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, + iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, + window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, + window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME, interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT, interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, @@ -87,7 +85,6 @@ class DataHandlerSingleStation(AbstractDataHandler): self.input_data, self.target_data = None, None self._transformation = self.setup_transformation(transformation) - self.station_type = station_type self.sampling = sampling self.target_dim = target_dim self.target_var = target_var @@ -140,7 +137,7 @@ class DataHandlerSingleStation(AbstractDataHandler): def __repr__(self): return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \ - f"statistics_per_var={self.statistics_per_var}, station_type='{self.station_type}', " \ + f"statistics_per_var={self.statistics_per_var}, " \ f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \ f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \ f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \ @@ -303,8 +300,7 @@ class DataHandlerSingleStation(AbstractDataHandler): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.store_data_locally, self.data_origin, - self.start, self.end) + self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit, sampling=self.sampling) self.set_inputs_and_targets() @@ -322,7 +318,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) - def load_data(self, path, station, statistics_per_var, sampling, station_type=None, store_data_locally=False, + def load_data(self, path, station, statistics_per_var, sampling, store_data_locally=False, data_origin: Dict = None, start=None, end=None): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -341,31 +337,30 @@ class DataHandlerSingleStation(AbstractDataHandler): if os.path.exists(meta_file): os.remove(meta_file) data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, store_data_locally=store_data_locally, - data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) + store_data_locally=store_data_locally, data_origin=data_origin, + time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug(f"loaded new data") else: try: logging.debug(f"try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) - self.check_station_meta(meta, station, station_type, data_origin) + self.check_station_meta(meta, station, data_origin, statistics_per_var) logging.debug("loading finished") except FileNotFoundError as e: logging.debug(e) logging.debug(f"load new data") data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, store_data_locally=store_data_locally, - data_origin=data_origin, time_dim=self.time_dim, - target_dim=self.target_dim, iter_dim=self.iter_dim) + store_data_locally=store_data_locally, data_origin=data_origin, + time_dim=self.time_dim, target_dim=self.target_dim, + iter_dim=self.iter_dim) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) return data, meta - def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, station_type=None, + def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: """ @@ -401,8 +396,7 @@ class DataHandlerSingleStation(AbstractDataHandler): if toar_origin is None or len(toar_stats) > 0: # load combined data from toar-data (v2 & v1) df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, - sampling=sampling, data_origin=toar_origin, - station_type=station_type) + sampling=sampling, data_origin=toar_origin) df = pd.concat([df_era5, df_toar], axis=1, sort=True) if meta_era5 is not None and meta_toar is not None: @@ -410,6 +404,7 @@ class DataHandlerSingleStation(AbstractDataHandler): else: meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) + meta.loc["statistics_per_var"] = str(statistics_per_var) df_all[station[0]] = df # convert df_all to xarray @@ -422,22 +417,21 @@ class DataHandlerSingleStation(AbstractDataHandler): return xarr, meta @staticmethod - def check_station_meta(meta, station, station_type, data_origin): + def check_station_meta(meta, station, data_origin, statistics_per_var): """ Search for the entries in meta data and compare the value with the requested values. Will raise a FileNotFoundError if the values mismatch. """ - if station_type is not None: - check_dict = {"station_type": station_type, "type": station_type, "data_origin": str(data_origin)} - for (k, v) in check_dict.items(): - if v is None or k not in meta.index: - continue - if meta.at[k, station[0]] != v: - logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " - f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " - f"grapping from web.") - raise FileNotFoundError + check_dict = {"data_origin": str(data_origin), "statistics_per_var": str(statistics_per_var)} + for (k, v) in check_dict.items(): + if v is None or k not in meta.index: + continue + if meta.at[k, station[0]] != v: + logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " + f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " + f"grapping from web.") + raise FileNotFoundError def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: """ diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 6fbdc38..e5760e9 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -68,8 +68,7 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.store_data_locally, self.data_origin, self.start, - self.end) + self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index fc4b8e7..9f9c595 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -76,7 +76,7 @@ def retries_session(max_retries=3): return http -def download_toar(station, toar_stats, sampling, data_origin, station_type=None): +def download_toar(station, toar_stats, sampling, data_origin): try: # load data from toar-data (v2) @@ -87,7 +87,7 @@ def download_toar(station, toar_stats, sampling, data_origin, station_type=None try: # load join data (toar-data v1) df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - station_type=station_type, data_origin=data_origin) + data_origin=data_origin) except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): df_join, meta_join = None, None -- GitLab From cd47dfa7ef5d3defa9a7d256df83d63f6ae95602 Mon Sep 17 00:00:00 2001 From: lukas leufen Date: Fri, 8 Jul 2022 12:38:04 +0200 Subject: [PATCH 10/11] apply proposed changes --- mlair/helpers/filter.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 247c4fc..5fc3df9 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -214,6 +214,7 @@ class ClimateFIRFilter(FIRFilter): h = [] if self.sel_opts is not None: self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} + self._check_sel_opts() sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) logging.debug(f"{self.display_name}: create diurnal_anomalies") if self.apriori_diurnal is True and sampling == "1H": @@ -303,6 +304,10 @@ class ClimateFIRFilter(FIRFilter): except Exception as e: logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") + def _check_sel_opts(self): + if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0: + raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data") + @staticmethod def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: next_order = 0 -- GitLab From 9b59692a2013f722e40b59bf204d020967f7242e Mon Sep 17 00:00:00 2001 From: leufen1 Date: Fri, 8 Jul 2022 15:20:52 +0200 Subject: [PATCH 11/11] update tests, ready for merge --- test/test_run_modules/test_pre_processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_run_modules/test_pre_processing.py b/test/test_run_modules/test_pre_processing.py index 1dafdbd..4618a5e 100644 --- a/test/test_run_modules/test_pre_processing.py +++ b/test/test_run_modules/test_pre_processing.py @@ -30,7 +30,7 @@ class TestPreProcessing: @pytest.fixture def obj_with_exp_setup(self): - ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], + ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW99X'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}, station_type="background", data_handler=DefaultDataHandler) pre = object.__new__(PreProcessing) @@ -87,7 +87,7 @@ class TestPreProcessing: def test_create_set_split_all_stations(self, caplog, obj_with_exp_setup): caplog.set_level(logging.DEBUG) obj_with_exp_setup.create_set_split(slice(0, 2), "awesome") - message = "Awesome stations (len=6): ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001']" + message = "Awesome stations (len=6): ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW99X']" assert ('root', 10, message) in caplog.record_tuples data_store = obj_with_exp_setup.data_store assert isinstance(data_store.get("data_collection", "general.awesome"), DataCollection) -- GitLab