Commit 2fff7bd8 authored by lukas leufen's avatar lukas leufen 👻
Browse files

running toar-data v2 downloads, toar-data v1 (JOIN) is also updated to use same parameters

parent 218b6d9a
Pipeline #104709 failed with stages
in 7 minutes and 33 seconds
......@@ -382,31 +382,31 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: downloaded data and its meta data
"""
df_all = {}
df_era5, df_join = None, None
meta_era5, meta_join = None, None
df_era5, df_toar = None, None
meta_era5, meta_toar = None, None
if data_origin is not None:
era5_origin = filter_dict_by_value(data_origin, "era5", True)
era5_stats = select_from_dict(statistics_per_var, era5_origin.keys())
join_origin = filter_dict_by_value(data_origin, "era5", False)
join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False)
assert len(era5_origin) + len(join_origin) == len(data_origin)
assert len(era5_stats) + len(join_stats) == len(statistics_per_var)
toar_origin = filter_dict_by_value(data_origin, "era5", False)
toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False)
assert len(era5_origin) + len(toar_origin) == len(data_origin)
assert len(era5_stats) + len(toar_stats) == len(statistics_per_var)
else:
era5_origin, join_origin = None, None
era5_stats, join_stats = statistics_per_var, statistics_per_var
era5_origin, toar_origin = None, None
era5_stats, toar_stats = statistics_per_var, statistics_per_var
# load data
if era5_origin is not None and len(era5_stats) > 0:
# load era5 data
df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling,
data_origin=era5_origin)
if join_origin is None or len(join_stats) > 0:
if toar_origin is None or len(toar_stats) > 0:
# load join data
_ = toar_data_v2.download_toar(station, join_stats, sampling=sampling, data_origin=join_origin)
df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type,
network_name=network, sampling=sampling, data_origin=join_origin)
df = pd.concat([df_era5, df_join], axis=1, sort=True)
meta = meta_era5 if meta_era5 is not None else meta_join
# df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin)
df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
station_type=station_type, data_origin=toar_origin)
df = pd.concat([df_era5, df_toar], axis=1, sort=True)
meta = meta_era5 if meta_era5 is not None else meta_toar
meta.loc["data_origin"] = str(data_origin)
df_all[station[0]] = df
......
......@@ -36,9 +36,9 @@ def load_era5(station_name, stat_var, sampling, data_origin):
meta = load_meta_data(station_name, None, None, join_url_base, headers)
# sel data for station using sel method nearest
data = xr.open_mfdataset(os.path.join(data_path, file_names))
station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True)
station_data = station_dask.to_array().T.compute()
with xr.open_mfdataset(os.path.join(data_path, file_names)) as data:
station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True)
station_data = station_dask.to_array().T.compute()
# transform data and meta to pandas
station_data = station_data.to_pandas()
......
......@@ -4,7 +4,7 @@ __date__ = '2019-10-16'
import datetime as dt
import logging
from typing import Iterator, Union, List, Dict
from typing import Iterator, Union, List, Dict, Tuple
import pandas as pd
import requests
......@@ -25,15 +25,13 @@ class EmptyQueryResult(Exception):
def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame,
pd.DataFrame]:
sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]:
"""
Read data from JOIN/TOAR.
:param station_name: Station name e.g. DEBY122
:param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily)
:param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid
origins are "REA" for reanalysis data and "" (empty string) for observational data.
......@@ -43,11 +41,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# also ensure that given data_origin dict is no reference
if data_origin is None or len(data_origin) == 0:
data_origin = None
else:
data_origin = {k: v for (k, v) in data_origin.items()}
# split network and origin information
data_origin, network_name = split_network_and_origin(data_origin)
# get data connection settings
join_url_base, headers = join_settings(sampling)
......@@ -105,6 +100,49 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
raise EmptyQueryResult("No data found in JOIN.")
def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]:
"""
Split given dict into network and data origin.
Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which
uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1).
"""
if origin_network_dict is None or len(origin_network_dict) == 0:
data_origin, network = None, None
else:
data_origin = {}
network = {}
for k, v in origin_network_dict.items():
network[k] = []
for _network in helpers.to_list(v):
if _network.lower() == "EEA".lower():
network[k].append("AIRBASE")
elif _network.lower() != "REA".lower():
network[k].append(_network)
if "REA" in v:
data_origin[k] = "REA"
else:
data_origin[k] = ""
network[k] = filter_network(network[k])
return data_origin, network
def filter_network(network: list) -> Union[list, None]:
"""
Filter given list of networks.
:param network: list of various network names (can contain duplicates)
:return: sorted list with unique entries
"""
sorted_network = []
for v in list(filter(lambda x: x != "", network)):
if v not in sorted_network:
sorted_network.append(v)
if len(sorted_network) == 0:
sorted_network = None
return sorted_network
def correct_data_format(data):
"""
Transform to the standard data format.
......@@ -190,7 +228,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none,
:return: all available series for requested station stored in an dictionary with parameter name (variable) as key
and the series id as value.
"""
network_name_opts = network_name if network_name is None else ",".join(helpers.to_list(network_name))
network_name_opts = _create_network_name_opts(network_name)
opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name_opts, "as_dict": "true",
"columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"}
......@@ -199,6 +237,21 @@ def load_series_information(station_name: List[str], station_type: str_or_none,
return _select_distinct_series(station_vars, data_origin, network_name)
def _create_network_name_opts(network_name):
if network_name is None:
network_name_opts = network_name
elif isinstance(network_name, list):
network_name_opts = ",".join(helpers.to_list(network_name))
elif isinstance(network_name, dict):
_network = []
for v in network_name.values():
_network.extend(helpers.to_list(v))
network_name_opts = ",".join(filter(lambda x: x is not None, set(_network)))
else:
raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.")
return network_name_opts
def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) -> \
[Dict, Dict]:
"""
......@@ -207,7 +260,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_
data_origin = {} if data_origin is None else data_origin
selected, data_origin = _select_distinct_data_origin(vars, data_origin)
network_name = [] if network_name is None else helpers.to_list(network_name)
network_name = [] if network_name is None else network_name
selected = _select_distinct_network(selected, network_name)
# extract id
......@@ -215,7 +268,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_
return selected, data_origin
def _select_distinct_network(vars: dict, network_name: list) -> dict:
def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict:
"""
Select distinct series regarding network name. The order the network names are provided in parameter `network_name`
indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is
......@@ -228,15 +281,18 @@ def _select_distinct_network(vars: dict, network_name: list) -> dict:
indicating to use always first candidate for each variable.
:return: dictionary with single series reference for each variable
"""
if isinstance(network_name, (list, str)):
network_name = {var: helpers.to_list(network_name) for var in vars.keys()}
selected = {}
for var, series in vars.items():
res = []
for network in network_name:
network_list = network_name.get(var, []) or []
for network in network_list:
res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series)))
if len(res) > 0: # use first match which has the highest priority
selected[var] = res[0]
else:
if len(network_name) == 0: # just print message which network is used if none is provided
if len(network_list) == 0: # just print message which network is used if none is provided
selected[var] = series[0]
logging.info(f"Could not find a valid match for variable {var} and networks {network_name}! "
f"Therefore, use first answer from JOIN: {series[0]}")
......
......@@ -19,6 +19,19 @@ str_or_none = Union[str, None]
def download_toar(station_name: Union[str, List[str]], stat_var: dict,
sampling: str = "daily", data_origin: Dict = None):
"""
Download data from https://toar-data.fz-juelich.de/api/v2/
Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is
given, this method tries to load time series for this origin. In case no origin is provided, this method loads data
with the highest priority according to toar-data's order parameter.
:param station_name:
:param stat_var:
:param sampling:
:param data_origin:
:return:
"""
# make sure station_name parameter is a list
station_name = to_list(station_name)
......@@ -49,24 +62,79 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict,
# get data connection settings for data
data_url_base, headers = toar_data_v2_settings(sampling)
data_dict = {}
for var, meta in timeseries_meta.items():
logging.debug(f"load {var}")
data_var = load_timeseries_data(meta, data_url_base, headers)
return
def load_timeseries_data(timeseries_meta, url_base, headers):
meta, opts = prepare_meta(meta, sampling, stat_var, var)
data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0]
data_dict[var] = data_var
data = pd.DataFrame.from_dict(data_dict)
meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()})
meta = pd.DataFrame.from_dict(meta, orient='index')
meta.columns = station_name
return data, meta
def prepare_meta(meta, sampling, stat_var, var):
meta = meta[0]
opts = {}
if sampling == "daily":
opts["timeseries_id"] = meta.pop("id")
meta["id"] = None
opts["names"] = stat_var[var]
opts["sampling"] = sampling
return [meta], opts
def combine_meta_data(station_meta, timeseries_meta):
meta = {}
for k, v in station_meta.items():
print(k)
if k == "codes":
meta[k] = v[0]
elif k in ["coordinates", "additional_metadata", "globalmeta"]:
for _key, _val in v.items():
print(_key)
if _key == "lng":
meta["lon"] = _val
else:
meta[_key] = _val
elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]:
continue
else:
meta[k] = v
for var, var_meta in timeseries_meta.items():
print(var)
for k, v in var_meta.items():
print(k)
if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]:
continue
elif k == "roles":
for _key, _val in v[0]["contact"]["organisation"].items():
new_k = f"{var}_organisation_{_key}"
meta[new_k] = _val
elif k == "variable":
for _key, _val in v.items():
new_k = f"{var}_{_key}"
meta[new_k] = _val
else:
new_k = f"{var}_{k}"
meta[new_k] = v
return meta
def load_timeseries_data(timeseries_meta, url_base, opts, headers):
coll = []
for meta in timeseries_meta:
series_id = meta["id"]
# opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
res = get_data(opts, headers, as_json=False)
data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
infer_datetime_format=True)
coll.append(data["value"])
data = data["value"].rename(meta["variable"]["name"])
coll.append(data)
return coll
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment