diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index cb2c28a9eb1788021118e4df4a826fca100c86ac..690a44ff9f525a35890a5d3d48c5141309f222d5 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -382,31 +382,31 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_join = None, None - meta_era5, meta_join = None, None + df_era5, df_toar = None, None + meta_era5, meta_toar = None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) - join_origin = filter_dict_by_value(data_origin, "era5", False) - join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) - assert len(era5_origin) + len(join_origin) == len(data_origin) - assert len(era5_stats) + len(join_stats) == len(statistics_per_var) + toar_origin = filter_dict_by_value(data_origin, "era5", False) + toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) + assert len(era5_origin) + len(toar_origin) == len(data_origin) + assert len(era5_stats) + len(toar_stats) == len(statistics_per_var) else: - era5_origin, join_origin = None, None - era5_stats, join_stats = statistics_per_var, statistics_per_var + era5_origin, toar_origin = None, None + era5_stats, toar_stats = statistics_per_var, statistics_per_var # load data if era5_origin is not None and len(era5_stats) > 0: # load era5 data df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, data_origin=era5_origin) - if join_origin is None or len(join_stats) > 0: + if toar_origin is None or len(toar_stats) > 0: # load join data - _ = toar_data_v2.download_toar(station, join_stats, sampling=sampling, data_origin=join_origin) - df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type, - network_name=network, sampling=sampling, data_origin=join_origin) - df = pd.concat([df_era5, df_join], axis=1, sort=True) - meta = meta_era5 if meta_era5 is not None else meta_join + # df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) + df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=toar_origin) + df = pd.concat([df_era5, df_toar], axis=1, sort=True) + meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) df_all[station[0]] = df diff --git a/mlair/helpers/era5.py b/mlair/helpers/era5.py index 8cf967a2c0ae65ff76d9464433266c2fdfc6ef4f..e0fb0746e5763f7df141b2cb85ab9df5a1d7f97e 100644 --- a/mlair/helpers/era5.py +++ b/mlair/helpers/era5.py @@ -36,9 +36,9 @@ def load_era5(station_name, stat_var, sampling, data_origin): meta = load_meta_data(station_name, None, None, join_url_base, headers) # sel data for station using sel method nearest - data = xr.open_mfdataset(os.path.join(data_path, file_names)) - station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) - station_data = station_dask.to_array().T.compute() + with xr.open_mfdataset(os.path.join(data_path, file_names)) as data: + station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) + station_data = station_dask.to_array().T.compute() # transform data and meta to pandas station_data = station_data.to_pandas() diff --git a/mlair/helpers/join.py b/mlair/helpers/join.py index f7f204acf8c13dd3c85e2dfbce7e4adc92cef01f..409a15466214a6c3bd389f6e4671984275ef38b1 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/join.py @@ -4,7 +4,7 @@ __date__ = '2019-10-16' import datetime as dt import logging -from typing import Iterator, Union, List, Dict +from typing import Iterator, Union, List, Dict, Tuple import pandas as pd import requests @@ -25,15 +25,13 @@ class EmptyQueryResult(Exception): def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, - network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, - pd.DataFrame]: + sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]: """ Read data from JOIN/TOAR. :param station_name: Station name e.g. DEBY122 :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean' :param station_type: set the station type like "traffic" or "background", can be none - :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid origins are "REA" for reanalysis data and "" (empty string) for observational data. @@ -43,11 +41,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # make sure station_name parameter is a list station_name = helpers.to_list(station_name) - # also ensure that given data_origin dict is no reference - if data_origin is None or len(data_origin) == 0: - data_origin = None - else: - data_origin = {k: v for (k, v) in data_origin.items()} + # split network and origin information + data_origin, network_name = split_network_and_origin(data_origin) # get data connection settings join_url_base, headers = join_settings(sampling) @@ -105,6 +100,49 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t raise EmptyQueryResult("No data found in JOIN.") +def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: + """ + Split given dict into network and data origin. + + Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which + uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1). + """ + if origin_network_dict is None or len(origin_network_dict) == 0: + data_origin, network = None, None + else: + data_origin = {} + network = {} + for k, v in origin_network_dict.items(): + network[k] = [] + for _network in helpers.to_list(v): + if _network.lower() == "EEA".lower(): + network[k].append("AIRBASE") + elif _network.lower() != "REA".lower(): + network[k].append(_network) + if "REA" in v: + data_origin[k] = "REA" + else: + data_origin[k] = "" + network[k] = filter_network(network[k]) + return data_origin, network + + +def filter_network(network: list) -> Union[list, None]: + """ + Filter given list of networks. + + :param network: list of various network names (can contain duplicates) + :return: sorted list with unique entries + """ + sorted_network = [] + for v in list(filter(lambda x: x != "", network)): + if v not in sorted_network: + sorted_network.append(v) + if len(sorted_network) == 0: + sorted_network = None + return sorted_network + + def correct_data_format(data): """ Transform to the standard data format. @@ -190,7 +228,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none, :return: all available series for requested station stored in an dictionary with parameter name (variable) as key and the series id as value. """ - network_name_opts = network_name if network_name is None else ",".join(helpers.to_list(network_name)) + network_name_opts = _create_network_name_opts(network_name) opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, "network_name": network_name_opts, "as_dict": "true", "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} @@ -199,6 +237,21 @@ def load_series_information(station_name: List[str], station_type: str_or_none, return _select_distinct_series(station_vars, data_origin, network_name) +def _create_network_name_opts(network_name): + if network_name is None: + network_name_opts = network_name + elif isinstance(network_name, list): + network_name_opts = ",".join(helpers.to_list(network_name)) + elif isinstance(network_name, dict): + _network = [] + for v in network_name.values(): + _network.extend(helpers.to_list(v)) + network_name_opts = ",".join(filter(lambda x: x is not None, set(_network))) + else: + raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.") + return network_name_opts + + def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) -> \ [Dict, Dict]: """ @@ -207,7 +260,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_ data_origin = {} if data_origin is None else data_origin selected, data_origin = _select_distinct_data_origin(vars, data_origin) - network_name = [] if network_name is None else helpers.to_list(network_name) + network_name = [] if network_name is None else network_name selected = _select_distinct_network(selected, network_name) # extract id @@ -215,7 +268,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_ return selected, data_origin -def _select_distinct_network(vars: dict, network_name: list) -> dict: +def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict: """ Select distinct series regarding network name. The order the network names are provided in parameter `network_name` indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is @@ -228,15 +281,18 @@ def _select_distinct_network(vars: dict, network_name: list) -> dict: indicating to use always first candidate for each variable. :return: dictionary with single series reference for each variable """ + if isinstance(network_name, (list, str)): + network_name = {var: helpers.to_list(network_name) for var in vars.keys()} selected = {} for var, series in vars.items(): res = [] - for network in network_name: + network_list = network_name.get(var, []) or [] + for network in network_list: res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series))) if len(res) > 0: # use first match which has the highest priority selected[var] = res[0] else: - if len(network_name) == 0: # just print message which network is used if none is provided + if len(network_list) == 0: # just print message which network is used if none is provided selected[var] = series[0] logging.info(f"Could not find a valid match for variable {var} and networks {network_name}! " f"Therefore, use first answer from JOIN: {series[0]}") diff --git a/mlair/helpers/toar_data_v2.py b/mlair/helpers/toar_data_v2.py index 766dd13e6f2d54746275bdde0867df39a32e9e1f..5cc67b6de8a49f00047bcad1a06a257a15ea25cb 100644 --- a/mlair/helpers/toar_data_v2.py +++ b/mlair/helpers/toar_data_v2.py @@ -19,6 +19,19 @@ str_or_none = Union[str, None] def download_toar(station_name: Union[str, List[str]], stat_var: dict, sampling: str = "daily", data_origin: Dict = None): + """ + Download data from https://toar-data.fz-juelich.de/api/v2/ + + Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is + given, this method tries to load time series for this origin. In case no origin is provided, this method loads data + with the highest priority according to toar-data's order parameter. + + :param station_name: + :param stat_var: + :param sampling: + :param data_origin: + :return: + """ # make sure station_name parameter is a list station_name = to_list(station_name) @@ -49,24 +62,79 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, # get data connection settings for data data_url_base, headers = toar_data_v2_settings(sampling) + data_dict = {} for var, meta in timeseries_meta.items(): logging.debug(f"load {var}") - - data_var = load_timeseries_data(meta, data_url_base, headers) - - return - - -def load_timeseries_data(timeseries_meta, url_base, headers): + meta, opts = prepare_meta(meta, sampling, stat_var, var) + data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0] + data_dict[var] = data_var + data = pd.DataFrame.from_dict(data_dict) + + meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name + return data, meta + + +def prepare_meta(meta, sampling, stat_var, var): + meta = meta[0] + opts = {} + if sampling == "daily": + opts["timeseries_id"] = meta.pop("id") + meta["id"] = None + opts["names"] = stat_var[var] + opts["sampling"] = sampling + return [meta], opts + + +def combine_meta_data(station_meta, timeseries_meta): + meta = {} + for k, v in station_meta.items(): + print(k) + if k == "codes": + meta[k] = v[0] + elif k in ["coordinates", "additional_metadata", "globalmeta"]: + for _key, _val in v.items(): + print(_key) + if _key == "lng": + meta["lon"] = _val + else: + meta[_key] = _val + elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: + continue + else: + meta[k] = v + for var, var_meta in timeseries_meta.items(): + print(var) + for k, v in var_meta.items(): + print(k) + if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: + continue + elif k == "roles": + for _key, _val in v[0]["contact"]["organisation"].items(): + new_k = f"{var}_organisation_{_key}" + meta[new_k] = _val + elif k == "variable": + for _key, _val in v.items(): + new_k = f"{var}_{_key}" + meta[new_k] = _val + else: + new_k = f"{var}_{k}" + meta[new_k] = v + return meta + + +def load_timeseries_data(timeseries_meta, url_base, opts, headers): coll = [] for meta in timeseries_meta: series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} - opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv"} + opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} res = get_data(opts, headers, as_json=False) data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, infer_datetime_format=True) - coll.append(data["value"]) + data = data["value"].rename(meta["variable"]["name"]) + coll.append(data) return coll