Skip to content
Snippets Groups Projects
Commit ff8b044a authored by leufen1's avatar leufen1
Browse files

can now use toar statistics api v2

parent b4e4114b
No related branches found
No related tags found
3 merge requests!522filter can now combine obs, forecast, and apriori for first iteration. Further...,!521Resolve "release v2.4.0",!518Resolve "Use Toar statistics api v2"
Pipeline #142777 passed
......@@ -85,6 +85,36 @@ class EmptyQueryResult(Exception):
pass
def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes:
"""
Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2)
read status of id until finished, (3) download data with job id.
"""
url = create_url(**opts)
response_error = None
for retry in range(max_retries + 1):
time.sleep(random.random())
try:
timeout = timeout_base * (2 ** retry)
logging.info(f"connect (retry={retry}, timeout={timeout}) {url}")
start_time = time.time()
with TimeTracking(name=url):
session = retries_session(max_retries=0)
response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read)
while (time.time() - start_time) < timeout:
response = requests.get(response.json()["status"], timeout=(5, 5))
if response.history:
break
time.sleep(2)
return response.content
except Exception as e:
time.sleep(retry)
logging.debug(f"There was an error for request {url}: {e}")
response_error = e
if retry + 1 >= max_retries:
raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}")
def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]:
"""
Download join data using requests framework.
......
......@@ -10,10 +10,12 @@ from io import StringIO
import pandas as pd
import pytz
from timezonefinder import TimezoneFinder
from io import BytesIO
import zipfile
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers import to_list
from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name
from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query
str_or_none = Union[str, None]
......@@ -120,9 +122,9 @@ def prepare_meta(meta, sampling, stat_var, var):
for m in meta:
opts = {}
if sampling == "daily":
opts["timeseries_id"] = m.pop("id")
opts["id"] = m.pop("id")
m["id"] = None
opts["names"] = stat_var[var]
opts["statistics"] = stat_var[var]
opts["sampling"] = sampling
out.append(([m], opts))
return out
......@@ -167,17 +169,32 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
series_id = meta["id"]
# opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
if sampling != "hourly":
opts["service"] = None
if sampling == "hourly":
res = get_data(opts, headers, as_json=False)
data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
infer_datetime_format=True)
data = extract_timeseries_data(res, "string")
else:
opts["service"] = None
opts["format"] = None
res = get_data_with_query(opts, headers, as_json=False)
data = extract_timeseries_data(res, "bytes")
if len(data.index) > 0:
data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"])
data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"])
coll.append(data)
return coll
def extract_timeseries_data(result, result_format):
if result_format == "string":
return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True,
infer_datetime_format=True)
elif result_format == "bytes":
with zipfile.ZipFile(BytesIO(result)) as file:
return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime",
parse_dates=True)
else:
raise ValueError(f"Unknown result format given: {result_format}")
def load_station_information(station_name: List[str], url_base: str, headers: Dict):
# opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"}
opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment