Skip to content
Snippets Groups Projects
Commit 067a83c8 authored by Simon Grasse's avatar Simon Grasse
Browse files

implement status url caching for same parameters

parent ee96dc28
No related branches found
No related tags found
1 merge request!11Creation of first beta release version
import time
import io
from zipfile import ZipFile
from dataclasses import dataclass
from dataclasses import dataclass, asdict
from contextlib import contextmanager
import json
import requests
import pandas as pd
......@@ -14,13 +16,122 @@ STATION_LON = "station_coordinates_lng"
COORDS = [STATION_LAT, STATION_LON]
@dataclass(frozen=True)
class QueryOptions:
daterange: str
variable_id: str
statistics: str
sampling: str
min_data_capture = "0" # TODO check effect on NaNs
metadata_scheme = "basic"
limit = "None"
format = "by_statistic"
@staticmethod
def from_metadata(metadata: Metadata):
return QueryOptions(
daterange=metadata.time.daterange_option,
variable_id=str(metadata.variable.toar_id),
statistics=metadata.statistic,
sampling=metadata.time.sampling,
)
@property
def cache_key(self):
return "".join(asdict(self).values())
class Cache:
def __init__(self, cache_dir):
self.cache_file = cache_dir / "status_endpoints.json"
if not self.cache_file.is_file(): # initialize cache
with open(self.cache_file, "w") as cache:
json.dump({"foo": "bar"}, cache)
def __contains__(self, item):
with self.storage_dict() as storage:
return item in storage.keys()
def get(self, key: str):
with self.storage_dict() as storage:
return storage[key]
def put(self, key: str, content: str):
with self.storage_dict() as storage:
storage[key] = content
def remove(self, key: str):
with self.storage_dict() as storage:
del storage[key]
@contextmanager
def storage_dict(self):
with open(self.cache_file, "r") as cache: # setup
storage_dict = json.load(cache)
yield storage_dict
with open(self.cache_file, "w") as cache: # teardown
json.dump(storage_dict, cache)
class Connection:
def __init__(self, endpoint, cache_dir):
self.endpoint = endpoint
self.cache = Cache(cache_dir)
# max wait time is 30min
self.wait_seconds = [minutes * 60 for minutes in (1, 2, 2, 5, 10, 10)]
def get(self, query_options):
status_endpoint = self.get_status_endpoint(query_options)
for i, wait_time in enumerate(self.wait_seconds):
print(f"try: {i+1}, wait_time: {wait_time}")
response = self.wait_and_get(status_endpoint, wait_secs=wait_time)
if response.headers == "application/zip":
return response
else:
raise RuntimeError(
f"No data available after {sum(self.wait_seconds) / 60} minutes. retry later."
)
def get_status_endpoint(self, query_options: QueryOptions):
if query_options in self.cache:
status_endpoint = self.cache.get(query_options.cache_key)
try: # test for stale cache
self.wait_and_get(status_endpoint).raise_for_status()
except requests.exceptions.HTTPError:
self.cache.remove(query_options.cache_key)
else:
print("load status endpoint from cache")
return status_endpoint
status_endpoint = self.query_for_status_endpoint(query_options)
return status_endpoint
def query_for_status_endpoint(self, query_options: QueryOptions):
response = self.wait_and_get(self.endpoint, asdict(query_options))
status_endpoint = response.json()["status"]
self.cache.put(query_options.cache_key, status_endpoint)
return status_endpoint
def wait_and_get(
self, endpoint, query_options=None, wait_secs=None, timeout=(3.05, 5)
):
if wait_secs:
time.sleep(wait_secs)
return requests.get(endpoint, params=query_options, timeout=timeout)
class AnalysisService:
METADATA = "metadata"
def __init__(self, stats_endpoint):
self.stats_endpoint = stats_endpoint
self.wait_time = 30
self.max_response_time = 60 * 30 # 30 min
def __init__(self, stats_endpoint, cache_dir):
self.connection = Connection(stats_endpoint, cache_dir)
def get_data(self, metadata: Metadata) -> AnalysisRequestResult:
timeseries, timeseries_metadata = self.get_timeseries_and_metadata(metadata)
......@@ -28,6 +139,12 @@ class AnalysisService:
timeseries = self.get_clean_timeseries(timeseries, metadata)
return AnalysisRequestResult(timeseries, coords, metadata)
def get_timeseries_and_metadata(self, metadata: Metadata):
query_options = QueryOptions.from_metadata(metadata)
content = self.connection.get(query_options)
timeseries, timeseries_metadata = self.load_data(content, metadata)
return timeseries, timeseries_metadata
def get_clean_coords(self, timeseries_metadata):
coords = timeseries_metadata[COORDS]
coords.columns = [Coordinates.latitude.name, Coordinates.longitude.name]
......@@ -47,40 +164,6 @@ class AnalysisService:
timeseries.columns = metadata.time.as_datetime_index()
return timeseries
def get_timeseries_and_metadata(self, metadata: Metadata):
query_options = self.get_query_options(metadata)
content = self.query_timeseries_and_metadata(query_options)
timeseries, timeseries_metadata = self.load_data(content, metadata)
return timeseries, timeseries_metadata
def get_query_options(self, metadata: Metadata):
return {
"daterange": metadata.time.daterange_option,
"variable_id": metadata.variable.toar_id,
"statistics": metadata.statistic,
"sampling": metadata.time.sampling,
"min_data_capture": 0, # TODO check effect on NaNs
"metadata_scheme": "basic",
"limit": "None",
"format": "by_statistic",
}
def query_timeseries_and_metadata(self, query_options, metadata) -> bytes:
response = requests.get(self.stats_endpoint, params=query_options)
return self.wait_for_data(response.json()["status"]) # TODO str
def wait_for_data(self, status_endpoint):
# TODO proper waiting
for total_wait_time in range(0, self.max_response_time, self.wait_time):
time.sleep(self.wait_time)
response = requests.get(status_endpoint) # TODO handle server errors
if response.headers["Content-Type"] == "application/zip":
return response.content
else:
raise Exception(f"Time Out after {self.max_response_time}")
def load_data(
self, content: bytes, metadata: Metadata
) -> (pd.DataFrame, pd.DataFrame):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment