diff --git a/mlair/configuration/toar_data_v2_settings.py b/mlair/configuration/toar_data_v2_settings.py index a8bb9f42cf5a1967f150aa18019c2dbdc89f43a2..da17b90d32088431566f93ae951545ff5a168079 100644 --- a/mlair/configuration/toar_data_v2_settings.py +++ b/mlair/configuration/toar_data_v2_settings.py @@ -10,7 +10,7 @@ def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]: :return: Service url and optional headers """ if sampling == "daily": # pragma: no branch - TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/" + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/" headers = {} elif sampling == "hourly" or sampling == "meta": TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/" diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 0be52e937b963a9c277992c57e00f9db282f48a5..724d8c8f54423bbc0311cb9775463d1eba9b044e 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -331,7 +331,7 @@ class DataHandlerSingleStation(AbstractDataHandler): file_name = self._set_file_name(path, station, statistics_per_var) meta_file = self._set_meta_file_name(path, station, statistics_per_var) if self.overwrite_local_data is True: - logging.debug(f"overwrite_local_data is true, therefore reload {file_name}") + logging.debug(f"{self.station[0]}: overwrite_local_data is true, therefore reload {file_name}") if os.path.exists(file_name): os.remove(file_name) if os.path.exists(meta_file): @@ -339,22 +339,22 @@ class DataHandlerSingleStation(AbstractDataHandler): data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, store_data_locally=store_data_locally, data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) - logging.debug(f"loaded new data") + logging.debug(f"{self.station[0]}: loaded new data") else: try: - logging.debug(f"try to load local data from: {file_name}") + logging.debug(f"{self.station[0]}: try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) self.check_station_meta(meta, station, data_origin, statistics_per_var) - logging.debug("loading finished") + logging.debug(f"{self.station[0]}: loading finished") except FileNotFoundError as e: - logging.debug(e) - logging.debug(f"load new data") + logging.debug(f"{self.station[0]}: {e}") + logging.debug(f"{self.station[0]}: load new data") data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, store_data_locally=store_data_locally, data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) - logging.debug("loading finished") + logging.debug(f"{self.station[0]}: loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) @@ -372,7 +372,7 @@ class DataHandlerSingleStation(AbstractDataHandler): if v is None or k not in meta.index: continue if meta.at[k, station[0]] != v: - logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " + logging.debug(f"{station[0]}: meta data does not agree with given request for {k}: {v} (requested) != " f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " f"grapping from web.") raise FileNotFoundError diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index e5760e9afb52f9d55071214fb632601d744f124e..8c28faab29aa40f1aac7adcac4292626dd2d0343 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -374,7 +374,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation def apply_filter(self): """Apply FIR filter only on inputs.""" self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori - logging.info(f"{self.station}: call ClimateFIRFilter") + logging.info(f"{self.station[0]}: call ClimateFIRFilter") climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, diff --git a/mlair/helpers/data_sources/data_loader.py b/mlair/helpers/data_sources/data_loader.py index 7131c6b3fa4f340715c53e94163ce3e67ec40003..e906acac28d29871d3cef2ec377d1ca2da3ae1cf 100644 --- a/mlair/helpers/data_sources/data_loader.py +++ b/mlair/helpers/data_sources/data_loader.py @@ -85,6 +85,36 @@ class EmptyQueryResult(Exception): pass +def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes: + """ + Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2) + read status of id until finished, (3) download data with job id. + """ + url = create_url(**opts) + response_error = None + for retry in range(max_retries + 1): + time.sleep(random.random()) + try: + timeout = timeout_base * (2 ** retry) + logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") + start_time = time.time() + with TimeTracking(name=url): + session = retries_session(max_retries=0) + response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read) + while (time.time() - start_time) < timeout: + response = requests.get(response.json()["status"], timeout=(5, 5)) + if response.history: + break + time.sleep(2) + return response.content + except Exception as e: + time.sleep(retry) + logging.debug(f"There was an error for request {url}: {e}") + response_error = e + if retry + 1 >= max_retries: + raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}") + + def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]: """ Download join data using requests framework. diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index 5d1cacc604f4288e48d12a72f8a24ba0d8b21fd1..3f2bc79d2bf3143452b30305692dd00f550ed930 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -10,10 +10,12 @@ from io import StringIO import pandas as pd import pytz from timezonefinder import TimezoneFinder +from io import BytesIO +import zipfile from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list -from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name +from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query str_or_none = Union[str, None] @@ -120,9 +122,9 @@ def prepare_meta(meta, sampling, stat_var, var): for m in meta: opts = {} if sampling == "daily": - opts["timeseries_id"] = m.pop("id") + opts["id"] = m.pop("id") m["id"] = None - opts["names"] = stat_var[var] + opts["statistics"] = stat_var[var] opts["sampling"] = sampling out.append(([m], opts)) return out @@ -167,17 +169,32 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} - if sampling != "hourly": + if sampling == "hourly": + res = get_data(opts, headers, as_json=False) + data = extract_timeseries_data(res, "string") + else: opts["service"] = None - res = get_data(opts, headers, as_json=False) - data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, - infer_datetime_format=True) + opts["format"] = None + res = get_data_with_query(opts, headers, as_json=False) + data = extract_timeseries_data(res, "bytes") if len(data.index) > 0: - data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) + data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"]) coll.append(data) return coll +def extract_timeseries_data(result, result_format): + if result_format == "string": + return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True, + infer_datetime_format=True) + elif result_format == "bytes": + with zipfile.ZipFile(BytesIO(result)) as file: + return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime", + parse_dates=True) + else: + raise ValueError(f"Unknown result format given: {result_format}") + + def load_station_information(station_name: List[str], url_base: str, headers: Dict): # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 5710b63336b5c3e505363b90215a8cb631c3da22..d56d064ad618b1dbae7b9c8b08a5887e8577dcbe 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -295,11 +295,12 @@ class PreProcessing(RunEnvironment): else: # serial solution logging.info("use serial validate station approach") kwargs.update({"return_strategy": "result"}) - for station in set_stations: + for i, station in enumerate(set_stations): dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) if dh is not None: collection.add(dh) valid_stations.append(s) + logging.info(f"...finished: {s} ({int((i + 1.) / len(set_stations) * 100)}%)") logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/" f"{len(set_stations)} valid stations ({set_name}).")