diff --git a/toargridding/toar_rest_client.py b/toargridding/toar_rest_client.py index 10787f85219518c8f4bab30df8bc224e1ecb9622..dac2f1377b9f8c73bab8122be665504bf3b7f28a 100644 --- a/toargridding/toar_rest_client.py +++ b/toargridding/toar_rest_client.py @@ -1,7 +1,9 @@ import time import io from zipfile import ZipFile -from dataclasses import dataclass +from dataclasses import dataclass, asdict +from contextlib import contextmanager +import json import requests import pandas as pd @@ -14,13 +16,122 @@ STATION_LON = "station_coordinates_lng" COORDS = [STATION_LAT, STATION_LON] +@dataclass(frozen=True) +class QueryOptions: + daterange: str + variable_id: str + statistics: str + sampling: str + min_data_capture = "0" # TODO check effect on NaNs + metadata_scheme = "basic" + limit = "None" + format = "by_statistic" + + @staticmethod + def from_metadata(metadata: Metadata): + return QueryOptions( + daterange=metadata.time.daterange_option, + variable_id=str(metadata.variable.toar_id), + statistics=metadata.statistic, + sampling=metadata.time.sampling, + ) + + @property + def cache_key(self): + return "".join(asdict(self).values()) + + +class Cache: + def __init__(self, cache_dir): + self.cache_file = cache_dir / "status_endpoints.json" + + if not self.cache_file.is_file(): # initialize cache + with open(self.cache_file, "w") as cache: + json.dump({"foo": "bar"}, cache) + + def __contains__(self, item): + with self.storage_dict() as storage: + return item in storage.keys() + + def get(self, key: str): + with self.storage_dict() as storage: + return storage[key] + + def put(self, key: str, content: str): + with self.storage_dict() as storage: + storage[key] = content + + def remove(self, key: str): + with self.storage_dict() as storage: + del storage[key] + + @contextmanager + def storage_dict(self): + with open(self.cache_file, "r") as cache: # setup + storage_dict = json.load(cache) + + yield storage_dict + + with open(self.cache_file, "w") as cache: # teardown + json.dump(storage_dict, cache) + + +class Connection: + def __init__(self, endpoint, cache_dir): + self.endpoint = endpoint + self.cache = Cache(cache_dir) + # max wait time is 30min + self.wait_seconds = [minutes * 60 for minutes in (1, 2, 2, 5, 10, 10)] + + def get(self, query_options): + status_endpoint = self.get_status_endpoint(query_options) + + for i, wait_time in enumerate(self.wait_seconds): + print(f"try: {i+1}, wait_time: {wait_time}") + response = self.wait_and_get(status_endpoint, wait_secs=wait_time) + if response.headers == "application/zip": + return response + else: + raise RuntimeError( + f"No data available after {sum(self.wait_seconds) / 60} minutes. retry later." + ) + + def get_status_endpoint(self, query_options: QueryOptions): + if query_options in self.cache: + status_endpoint = self.cache.get(query_options.cache_key) + + try: # test for stale cache + self.wait_and_get(status_endpoint).raise_for_status() + except requests.exceptions.HTTPError: + self.cache.remove(query_options.cache_key) + else: + print("load status endpoint from cache") + return status_endpoint + + status_endpoint = self.query_for_status_endpoint(query_options) + return status_endpoint + + def query_for_status_endpoint(self, query_options: QueryOptions): + response = self.wait_and_get(self.endpoint, asdict(query_options)) + status_endpoint = response.json()["status"] + self.cache.put(query_options.cache_key, status_endpoint) + + return status_endpoint + + def wait_and_get( + self, endpoint, query_options=None, wait_secs=None, timeout=(3.05, 5) + ): + if wait_secs: + time.sleep(wait_secs) + + return requests.get(endpoint, params=query_options, timeout=timeout) + + class AnalysisService: METADATA = "metadata" - def __init__(self, stats_endpoint): - self.stats_endpoint = stats_endpoint - self.wait_time = 30 - self.max_response_time = 60 * 30 # 30 min + def __init__(self, stats_endpoint, cache_dir): + self.connection = Connection(stats_endpoint, cache_dir) def get_data(self, metadata: Metadata) -> AnalysisRequestResult: timeseries, timeseries_metadata = self.get_timeseries_and_metadata(metadata) @@ -28,6 +139,12 @@ class AnalysisService: timeseries = self.get_clean_timeseries(timeseries, metadata) return AnalysisRequestResult(timeseries, coords, metadata) + def get_timeseries_and_metadata(self, metadata: Metadata): + query_options = QueryOptions.from_metadata(metadata) + content = self.connection.get(query_options) + timeseries, timeseries_metadata = self.load_data(content, metadata) + return timeseries, timeseries_metadata + def get_clean_coords(self, timeseries_metadata): coords = timeseries_metadata[COORDS] coords.columns = [Coordinates.latitude.name, Coordinates.longitude.name] @@ -47,40 +164,6 @@ class AnalysisService: timeseries.columns = metadata.time.as_datetime_index() return timeseries - def get_timeseries_and_metadata(self, metadata: Metadata): - query_options = self.get_query_options(metadata) - content = self.query_timeseries_and_metadata(query_options) - timeseries, timeseries_metadata = self.load_data(content, metadata) - return timeseries, timeseries_metadata - - def get_query_options(self, metadata: Metadata): - return { - "daterange": metadata.time.daterange_option, - "variable_id": metadata.variable.toar_id, - "statistics": metadata.statistic, - "sampling": metadata.time.sampling, - "min_data_capture": 0, # TODO check effect on NaNs - "metadata_scheme": "basic", - "limit": "None", - "format": "by_statistic", - } - - def query_timeseries_and_metadata(self, query_options, metadata) -> bytes: - response = requests.get(self.stats_endpoint, params=query_options) - return self.wait_for_data(response.json()["status"]) # TODO str - - def wait_for_data(self, status_endpoint): - # TODO proper waiting - for total_wait_time in range(0, self.max_response_time, self.wait_time): - time.sleep(self.wait_time) - - response = requests.get(status_endpoint) # TODO handle server errors - - if response.headers["Content-Type"] == "application/zip": - return response.content - else: - raise Exception(f"Time Out after {self.max_response_time}") - def load_data( self, content: bytes, metadata: Metadata ) -> (pd.DataFrame, pd.DataFrame):