Commit 218b6d9a authored by lukas leufen's avatar lukas leufen 👻
Browse files

first implementation of toar-data-v2, can load data (but cannot process these...

first implementation of toar-data-v2, can load data (but cannot process these for now), savepoint, #396
parent 9afc74b7
Pipeline #104461 failed with stages
in 8 minutes and 20 seconds
"""Settings to access"""
from typing import Tuple, Dict
def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]:
Set url for toar-data and required headers. Headers information is not required for now.
:param sampling: temporal resolution to access.
:return: Service url and optional headers
if sampling == "daily": # pragma: no branch
headers = {}
elif sampling == "hourly" or sampling == "meta":
headers = {}
raise NameError(f"Given sampling {sampling} is not supported, choose from either daily or hourly sampling.")
return TOAR_SERVICE_URL, headers
......@@ -22,6 +22,7 @@ from mlair.configuration import check_path_and_create
from mlair import helpers
from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5
from mlair.data_handler.abstract_data_handler import AbstractDataHandler
from mlair.helpers import toar_data_v2
# define a more general date type for type hinting
date = Union[, dt.datetime]
......@@ -401,6 +402,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
if join_origin is None or len(join_stats) > 0:
# load join data
_ = toar_data_v2.download_toar(station, join_stats, sampling=sampling, data_origin=join_origin)
df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type,
network_name=network, sampling=sampling, data_origin=join_origin)
df = pd.concat([df_era5, df_join], axis=1, sort=True)
......@@ -125,7 +125,7 @@ def correct_data_format(data):
return formatted
def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]:
def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]:
Download join data using requests framework.
......@@ -133,13 +133,14 @@ def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]:
:param opts: options to create the request url
:param headers: additional headers information like authorization, can be empty
:param as_json: extract response as json if true (default True)
:return: requested data (either as list or dictionary)
url = create_url(**opts)
response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read)
if response.status_code == 200:
return response.json()
return response.json() if as_json is True else response.text
raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}")
......@@ -322,7 +323,7 @@ def _lower_list(args: List[str]) -> Iterator[str]:
yield string.lower()
def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str:
def create_url(base: str, service: str, param_id: str = None, **kwargs: Union[str, int, float, None]) -> str:
Create a request url with given base url, service type and arbitrarily many additional keyword arguments.
......@@ -334,7 +335,13 @@ def create_url(base: str, service: str, **kwargs: Union[str, int, float, None])
if not base.endswith("/"):
base += "/"
url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
url = f"{base}{service}"
if not url.endswith("/"):
url += "/"
if param_id is not None:
url = f"{url}{param_id}"
if len(kwargs) > 0:
url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
return url
"""Functions to access"""
__author__ = 'Lukas Leufen'
__date__ = '2022-06-30'
import logging
from typing import Iterator, Union, List, Dict
from io import StringIO
import pandas as pd
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers import to_list
from mlair.helpers.join import EmptyQueryResult, get_data
str_or_none = Union[str, None]
def download_toar(station_name: Union[str, List[str]], stat_var: dict,
sampling: str = "daily", data_origin: Dict = None):
# make sure station_name parameter is a list
station_name = to_list(station_name)
# also ensure that given data_origin dict is no reference
if data_origin is None or len(data_origin) == 0:
data_origin = None
data_origin = {k: v for (k, v) in data_origin.items()}
# get data connection settings for meta
meta_url_base, headers = toar_data_v2_settings("meta")
# load variables
var_meta = load_variables_information(stat_var, meta_url_base, headers)
# load station meta
station_meta = load_station_information(station_name, meta_url_base, headers)
# load series information
timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin)
# # correct stat_var values if data is not aggregated (hourly)
# if sampling == "hourly":
# stat_var = {key: "values" for key in stat_var.keys()}"load data for {station_meta['codes'][0]} from TOAR-DATA")
# get data connection settings for data
data_url_base, headers = toar_data_v2_settings(sampling)
for var, meta in timeseries_meta.items():
logging.debug(f"load {var}")
data_var = load_timeseries_data(meta, data_url_base, headers)
def load_timeseries_data(timeseries_meta, url_base, headers):
coll = []
for meta in timeseries_meta:
series_id = meta["id"]
# opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv"}
res = get_data(opts, headers, as_json=False)
data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
return coll
def load_station_information(station_name: List[str], url_base: str, headers: Dict):
# opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"}
opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
return get_data(opts, headers)
def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict,
data_origin: Dict = None) -> [Dict, Dict]:
timeseries_id_dict = {}
missing = []
for var, meta in var_meta.items():
timeseries_id_dict[var] = []
opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]}
res = get_data(opts, headers)
if len(res) == 0:
missing.append((var, meta))
# raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
# f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).")
if data_origin is not None:
var_origin = data_origin[var]
timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin)
# if len(timeseries_id_dict[var]) == 0:
# raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
# f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) "
# f"and timeseries origin {var_origin}.")
if data_origin is None or len(timeseries_id_dict[var]) == 0:
timeseries_id_dict[var] = select_timeseries_by_order(res)
if len(missing) > 0:
missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing])
raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
f"({station_meta['codes'][0]}) and variables {missing}.")
return timeseries_id_dict
def select_timeseries_by_order(toar_meta):
order_dict = {meta["order"]: meta for meta in toar_meta}
res = [order_dict[order] for order in sorted(order_dict.keys())]
return res
def select_timeseries_by_origin(toar_meta, var_origin):
res = []
for origin in to_list(var_origin):
for meta in toar_meta:
for roles in meta["roles"]:
if roles["contact"]["organisation"]["name"].lower() == origin.lower():
return res
def load_variables_information(var_dict, url_base, headers):
var_meta_dict = {}
for var in var_dict.keys():
# opts = {"base": url_base, "service": f"variables/{var}"}
opts = {"base": url_base, "service": f"variables", "param_id": var}
var_meta_dict[var] = get_data(opts, headers)
return var_meta_dict
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment