Skip to content
Snippets Groups Projects
Commit 50d5c130 authored by lukas leufen's avatar lukas leufen
Browse files

Merge branch 'develop' into 'michael_issue450_feat_load-ifs-data'

# Conflicts:
#   mlair/data_handler/data_handler_single_station.py
parents e31a8e5d 8422d164
No related branches found
No related tags found
3 merge requests!522filter can now combine obs, forecast, and apriori for first iteration. Further...,!521Resolve "release v2.4.0",!517Resolve "load ifs data"
Pipeline #143735 failed
...@@ -10,7 +10,7 @@ def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]: ...@@ -10,7 +10,7 @@ def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]:
:return: Service url and optional headers :return: Service url and optional headers
""" """
if sampling == "daily": # pragma: no branch if sampling == "daily": # pragma: no branch
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/" TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/"
headers = {} headers = {}
elif sampling == "hourly" or sampling == "meta": elif sampling == "hourly" or sampling == "meta":
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/" TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/"
......
...@@ -335,7 +335,7 @@ class DataHandlerSingleStation(AbstractDataHandler): ...@@ -335,7 +335,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
file_name = self._set_file_name(path, station, statistics_per_var) file_name = self._set_file_name(path, station, statistics_per_var)
meta_file = self._set_meta_file_name(path, station, statistics_per_var) meta_file = self._set_meta_file_name(path, station, statistics_per_var)
if self.overwrite_local_data is True: if self.overwrite_local_data is True:
logging.debug(f"overwrite_local_data is true, therefore reload {file_name}") logging.debug(f"{self.station[0]}: overwrite_local_data is true, therefore reload {file_name}")
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
if os.path.exists(meta_file): if os.path.exists(meta_file):
...@@ -344,22 +344,22 @@ class DataHandlerSingleStation(AbstractDataHandler): ...@@ -344,22 +344,22 @@ class DataHandlerSingleStation(AbstractDataHandler):
store_data_locally=store_data_locally, data_origin=data_origin, store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim, time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim, window_dim=self.window_dim) iter_dim=self.iter_dim, window_dim=self.window_dim)
logging.debug(f"loaded new data") logging.debug(f"{self.station[0]}: loaded new data")
else: else:
try: try:
logging.debug(f"try to load local data from: {file_name}") logging.debug(f"{self.station[0]}: try to load local data from: {file_name}")
data = xr.open_dataarray(file_name) data = xr.open_dataarray(file_name)
meta = pd.read_csv(meta_file, index_col=0) meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta(meta, station, data_origin, statistics_per_var) self.check_station_meta(meta, station, data_origin, statistics_per_var)
logging.debug("loading finished") logging.debug(f"{self.station[0]}: loading finished")
except FileNotFoundError as e: except FileNotFoundError as e:
logging.debug(e) logging.debug(f"{self.station[0]}: {e}")
logging.debug(f"load new data") logging.debug(f"{self.station[0]}: load new data")
data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling,
store_data_locally=store_data_locally, data_origin=data_origin, store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim, time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim) iter_dim=self.iter_dim)
logging.debug("loading finished") logging.debug(f"{self.station[0]}: loading finished")
# create slices and check for negative concentration. # create slices and check for negative concentration.
data = self._slice_prep(data, start=start, end=end) data = self._slice_prep(data, start=start, end=end)
data = self.check_for_negative_concentrations(data) data = self.check_for_negative_concentrations(data)
...@@ -378,7 +378,7 @@ class DataHandlerSingleStation(AbstractDataHandler): ...@@ -378,7 +378,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
continue continue
m = ast.literal_eval(meta.at[k, station[0]]) m = ast.literal_eval(meta.at[k, station[0]])
if not check_nested_equality(select_from_dict(m, v.keys()), v): if not check_nested_equality(select_from_dict(m, v.keys()), v):
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " logging.debug(f"{station[0]}: meta data does not agree with given request for {k}: {v} (requested) != "
f"{m} (local). Raise FileNotFoundError to trigger new grapping from web.") f"{m} (local). Raise FileNotFoundError to trigger new grapping from web.")
raise FileNotFoundError raise FileNotFoundError
......
...@@ -374,7 +374,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation ...@@ -374,7 +374,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation
def apply_filter(self): def apply_filter(self):
"""Apply FIR filter only on inputs.""" """Apply FIR filter only on inputs."""
self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori
logging.info(f"{self.station}: call ClimateFIRFilter") logging.info(f"{self.station[0]}: call ClimateFIRFilter")
climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order,
self.filter_cutoff_freq, self.filter_cutoff_freq,
self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim,
......
...@@ -97,7 +97,37 @@ class EmptyQueryResult(Exception): ...@@ -97,7 +97,37 @@ class EmptyQueryResult(Exception):
pass pass
def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) -> Union[Dict, List, str]: def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes:
"""
Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2)
read status of id until finished, (3) download data with job id.
"""
url = create_url(**opts)
response_error = None
for retry in range(max_retries + 1):
time.sleep(random.random())
try:
timeout = timeout_base * (2 ** retry)
logging.info(f"connect (retry={retry}, timeout={timeout}) {url}")
start_time = time.time()
with TimeTracking(name=url):
session = retries_session(max_retries=0)
response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read)
while (time.time() - start_time) < timeout:
response = requests.get(response.json()["status"], timeout=(5, 5))
if response.history:
break
time.sleep(2)
return response.content
except Exception as e:
time.sleep(retry)
logging.debug(f"There was an error for request {url}: {e}")
response_error = e
if retry + 1 >= max_retries:
raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}")
def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]:
""" """
Download join data using requests framework. Download join data using requests framework.
...@@ -110,10 +140,11 @@ def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) -> ...@@ -110,10 +140,11 @@ def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) ->
:return: requested data (either as list or dictionary) :return: requested data (either as list or dictionary)
""" """
url = create_url(**opts) url = create_url(**opts)
for retry in range(max_retries): response_error = None
for retry in range(max_retries + 1):
time.sleep(random.random()) time.sleep(random.random())
try: try:
timeout = 60 * (2 ** retry) timeout = timeout_base * (2 ** retry)
logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") logging.info(f"connect (retry={retry}, timeout={timeout}) {url}")
with TimeTracking(name=url): with TimeTracking(name=url):
session = retries_session(max_retries=0) session = retries_session(max_retries=0)
...@@ -122,11 +153,13 @@ def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) -> ...@@ -122,11 +153,13 @@ def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5) ->
return response.json() if as_json is True else response.text return response.json() if as_json is True else response.text
else: else:
logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}") logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}")
response_error = f"STATUS {response.status_code}"
except Exception as e: except Exception as e:
time.sleep(2 * (2 ** retry)) time.sleep(2 * (2 ** retry))
logging.debug(f"There was an error for request {url}: {e}") logging.debug(f"There was an error for request {url}: {e}")
if retry + 1 >= max_retries: response_error = e
raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") if retry + 1 >= max_retries:
raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}")
def correct_stat_name(stat: str) -> str: def correct_stat_name(stat: str) -> str:
......
...@@ -10,10 +10,12 @@ from io import StringIO ...@@ -10,10 +10,12 @@ from io import StringIO
import pandas as pd import pandas as pd
import pytz import pytz
from timezonefinder import TimezoneFinder from timezonefinder import TimezoneFinder
from io import BytesIO
import zipfile
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers import to_list from mlair.helpers import to_list
from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query
str_or_none = Union[str, None] str_or_none = Union[str, None]
...@@ -120,9 +122,9 @@ def prepare_meta(meta, sampling, stat_var, var): ...@@ -120,9 +122,9 @@ def prepare_meta(meta, sampling, stat_var, var):
for m in meta: for m in meta:
opts = {} opts = {}
if sampling == "daily": if sampling == "daily":
opts["timeseries_id"] = m.pop("id") opts["id"] = m.pop("id")
m["id"] = None m["id"] = None
opts["names"] = stat_var[var] opts["statistics"] = stat_var[var]
opts["sampling"] = sampling opts["sampling"] = sampling
out.append(([m], opts)) out.append(([m], opts))
return out return out
...@@ -167,17 +169,32 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): ...@@ -167,17 +169,32 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
series_id = meta["id"] series_id = meta["id"]
# opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
if sampling != "hourly": if sampling == "hourly":
res = get_data(opts, headers, as_json=False)
data = extract_timeseries_data(res, "string")
else:
opts["service"] = None opts["service"] = None
res = get_data(opts, headers, as_json=False) opts["format"] = None
data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, res = get_data_with_query(opts, headers, as_json=False)
infer_datetime_format=True) data = extract_timeseries_data(res, "bytes")
if len(data.index) > 0: if len(data.index) > 0:
data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"])
coll.append(data) coll.append(data)
return coll return coll
def extract_timeseries_data(result, result_format):
if result_format == "string":
return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True,
infer_datetime_format=True)
elif result_format == "bytes":
with zipfile.ZipFile(BytesIO(result)) as file:
return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime",
parse_dates=True)
else:
raise ValueError(f"Unknown result format given: {result_format}")
def load_station_information(station_name: List[str], url_base: str, headers: Dict): def load_station_information(station_name: List[str], url_base: str, headers: Dict):
# opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"}
opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment