Skip to content
Snippets Groups Projects
Commit 9a4d9758 authored by Carsten Hinz's avatar Carsten Hinz
Browse files

notebook examples:

-changed from one large request to yearly requests
-changed waiting time for individual requests
-cleaned up

toar_rest_client:
-added function to alter the waiting intervals before checking for the results
-added indention to cache file
-worked on debug output
-
parent a9a0c089
No related branches found
No related tags found
1 merge request!11Creation of first beta release version
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from datetime import datetime as dt from datetime import datetime as dt
from collections import namedtuple from collections import namedtuple
from pathlib import Path from pathlib import Path
from toargridding.toar_rest_client import AnalysisServiceDownload, Connection from toargridding.toar_rest_client import AnalysisServiceDownload, Connection
from toargridding.grids import RegularGrid from toargridding.grids import RegularGrid
from toargridding.gridding import get_gridded_toar_data from toargridding.gridding import get_gridded_toar_data
from toargridding.metadata import TimeSample from toargridding.metadata import TimeSample
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#creation of request. #creation of request.
Config = namedtuple("Config", ["grid", "time", "variables", "stats"]) Config = namedtuple("Config", ["grid", "time", "variables", "stats"])
grid = RegularGrid( lat_resolution=1.9, lon_resolution=2.5, ) grid = RegularGrid( lat_resolution=1.9, lon_resolution=2.5, )
configs = dict() configs = dict()
for year in range (0,19): for year in range (0,19):
valid_data = Config( valid_data = Config(
grid, grid,
TimeSample( start=dt(2000+year,1,1), end=dt(2000+year,12,31), sampling="daily"),#possibly adopt range:-) TimeSample( start=dt(2000+year,1,1), end=dt(2000+year,12,31), sampling="daily"),#possibly adopt range:-)
["mole_fraction_of_ozone_in_air"],#variable name ["mole_fraction_of_ozone_in_air"],#variable name
[ "dma8epax" ]# change to dma8epa_strict [ "dma8epax" ]# change to dma8epa_strict
) )
configs[f"test_ta{year}"] = valid_data configs[f"test_ta{year}"] = valid_data
#testing access:
#config = configs["test_ta"]
#config.grid
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#CAVE: the request takes over 30min per requested year. Therefore this cell needs to be executed at different times to check, if the results are ready for download. #CAVE: the request takes over 30min per requested year. Therefore this cell needs to be executed at different times to check, if the results are ready for download.
#the processing is done on the server of the TOAR database. #the processing is done on the server of the TOAR database.
#a restart of the cell continues the request to the REST API if the requested data are ready for download #a restart of the cell continues the request to the REST API if the requested data are ready for download
# The download can also take a few minutes # The download can also take a few minutes
stats_endpoint = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/" stats_endpoint = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/"
cache_basepath = Path("cache") cache_basepath = Path("cache")
result_basepath = Path("results") result_basepath = Path("results")
cache_basepath.mkdir(exist_ok=True) cache_basepath.mkdir(exist_ok=True)
result_basepath.mkdir(exist_ok=True) result_basepath.mkdir(exist_ok=True)
analysis_service = AnalysisServiceDownload(stats_endpoint=stats_endpoint, cache_dir=cache_basepath, sample_dir=result_basepath, use_downloaded=True) analysis_service = AnalysisServiceDownload(stats_endpoint=stats_endpoint, cache_dir=cache_basepath, sample_dir=result_basepath, use_downloaded=True)
Connection.DEBUG=True Connection.DEBUG=True
minutes = 5
analysis_service.connection.wait_seconds = [minutes * 60 for i in range(5,61,minutes) ] #here we adopt the durations before, a request is stopped.
#the default value is 30 minutes.
#waiting up to 3h for one request
analysis_service.connection.setRequestTimes(interval_min=45, maxWait_min=3*60)
for person, config in configs.items(): for person, config in configs.items():
print(f"\nProcessing {person}:") print(f"\nProcessing {person}:")
print(f"--------------------") print(f"--------------------")
datasets, metadatas = get_gridded_toar_data( datasets, metadatas = get_gridded_toar_data(
analysis_service=analysis_service, analysis_service=analysis_service,
grid=config.grid, grid=config.grid,
time=config.time, time=config.time,
variables=config.variables, variables=config.variables,
stats=config.stats, stats=config.stats,
) )
for dataset, metadata in zip(datasets, metadatas): for dataset, metadata in zip(datasets, metadatas):
dataset.to_netcdf(result_basepath / f"{metadata.get_id()}_{config.grid.get_id()}.nc") dataset.to_netcdf(result_basepath / f"{metadata.get_id()}_{config.grid.get_id()}.nc")
print(metadata.get_id()) print(metadata.get_id())
``` ```
......
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from datetime import datetime as dt from datetime import datetime as dt
from collections import namedtuple from collections import namedtuple
from pathlib import Path from pathlib import Path
from toargridding.toar_rest_client import AnalysisServiceDownload, Connection from toargridding.toar_rest_client import AnalysisServiceDownload, Connection
from toargridding.grids import RegularGrid from toargridding.grids import RegularGrid
from toargridding.gridding import get_gridded_toar_data from toargridding.gridding import get_gridded_toar_data
from toargridding.metadata import TimeSample from toargridding.metadata import TimeSample
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#creation of request. #creation of request.
Config = namedtuple("Config", ["grid", "time", "variables", "stats","moreOptions"]) Config = namedtuple("Config", ["grid", "time", "variables", "stats","moreOptions"])
#moreOptions is implemented as a dict to add additional arguments to the query to the REST API #moreOptions is implemented as a dict to add additional arguments to the query to the REST API
#For example the field toar1_category with its possible values Urban, RuralLowElevation, RuralHighElevation and Unclassified can be added. #For example the field toar1_category with its possible values Urban, RuralLowElevation, RuralHighElevation and Unclassified can be added.
#see page 18 in https://toar-data.fz-juelich.de/sphinx/TOAR_UG_Vol03_Database/build/latex/toardatabase--userguide.pdf #see page 18 in https://toar-data.fz-juelich.de/sphinx/TOAR_UG_Vol03_Database/build/latex/toardatabase--userguide.pdf
#or type_of_area with urban, suburban and rural on page 20 can be used #or type_of_area with urban, suburban and rural on page 20 can be used
details4Query ={ details4Query ={
#"toar1_category" : "Urban" #uncomment if wished:-) #"toar1_category" : "Urban" #uncomment if wished:-)
#"toar1_category" : "RuralLowElevation" #uncomment if wished:-) #"toar1_category" : "RuralLowElevation" #uncomment if wished:-)
#"toar1_category" : "RuralHighElevation" #uncomment if wished:-) #"toar1_category" : "RuralHighElevation" #uncomment if wished:-)
#"type_of_area" : "Urban" #also test Rural, Suburban, #"type_of_area" : "Urban" #also test Rural, Suburban,
"type_of_area" : "Rural" #also test Rural, Suburban, "type_of_area" : "Rural" #also test Rural, Suburban,
#"type_of_area" : "Suburban" #also test Rural, Suburban, #"type_of_area" : "Suburban" #also test Rural, Suburban,
} }
grid = RegularGrid( lat_resolution=1.9, lon_resolution=2.5, ) grid = RegularGrid( lat_resolution=1.9, lon_resolution=2.5, )
configs = dict() configs = dict()
for year in range (0,19): for year in range(0,19):
valid_data = Config( valid_data = Config(
grid, grid,
TimeSample( start=dt(2000+year,1,1), end=dt(2000+year,12,31), sampling="daily"),#possibly adopt range:-) TimeSample( start=dt(2000+year,1,1), end=dt(2000+year,12,31), sampling="daily"),#possibly adopt range:-)
["mole_fraction_of_ozone_in_air"],#variable name ["mole_fraction_of_ozone_in_air"],#variable name
#[ "mean", "dma8epax"],# will start one request after another other... #[ "mean", "dma8epax"],# will start one request after another other...
[ "dma8epa_strict" ], [ "dma8epa_strict" ],
details4Query details4Query
) )
configs[f"test_ta{year}"] = valid_data configs[f"test_ta{year}"] = valid_data
#testing access:
#config = configs["test_ta2"]
#config.grid
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#CAVE: this cell runs about 45minutes per requested year. therefore we increase the waiting duration to 1h per request. #CAVE: this cell runs about 45minutes per requested year. therefore we increase the waiting duration to 1h per request.
#the processing is done on the server of the TOAR database. #the processing is done on the server of the TOAR database.
#a restart of the cell continues the request to the REST API if the requested data are ready for download #a restart of the cell continues the request to the REST API if the requested data are ready for download
# The download can also take a few minutes # The download can also take a few minutes
stats_endpoint = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/" stats_endpoint = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/"
cache_basepath = Path("cache") cache_basepath = Path("cache")
result_basepath = Path("results") result_basepath = Path("results")
cache_basepath.mkdir(exist_ok=True) cache_basepath.mkdir(exist_ok=True)
result_basepath.mkdir(exist_ok=True) result_basepath.mkdir(exist_ok=True)
analysis_service = AnalysisServiceDownload(stats_endpoint=stats_endpoint, cache_dir=cache_basepath, sample_dir=result_basepath, use_downloaded=True) analysis_service = AnalysisServiceDownload(stats_endpoint=stats_endpoint, cache_dir=cache_basepath, sample_dir=result_basepath, use_downloaded=True)
Connection.DEBUG=True Connection.DEBUG=True
minutes = 5
analysis_service.connection.wait_seconds = [minutes * 60 for i in range(5,61,minutes) ] # maybe adopt the interval for requesting the results and the total duration, before the client pauses the requests.
# as the requests take about 45min, it is more suitable to wait 60min before timing out the requests than the original 30min.
analysis_service.connection.setRequestTimes(interval_min=5, maxWait_min=60)
for person, config in configs.items(): for person, config in configs.items():
print(f"\nProcessing {person}:") print(f"\nProcessing {person}:")
print(f"--------------------") print(f"--------------------")
datasets, metadatas = get_gridded_toar_data( datasets, metadatas = get_gridded_toar_data(
analysis_service=analysis_service, analysis_service=analysis_service,
grid=config.grid, grid=config.grid,
time=config.time, time=config.time,
variables=config.variables, variables=config.variables,
stats=config.stats, stats=config.stats,
**config.moreOptions **config.moreOptions
) )
for dataset, metadata in zip(datasets, metadatas): for dataset, metadata in zip(datasets, metadatas):
dataset.to_netcdf(result_basepath / f"{metadata.get_id()}_{config.grid.get_id()}.nc") dataset.to_netcdf(result_basepath / f"{metadata.get_id()}_{config.grid.get_id()}.nc")
print(metadata.get_id()) print(metadata.get_id())
``` ```
......
...@@ -149,7 +149,7 @@ class Cache: ...@@ -149,7 +149,7 @@ class Cache:
yield storage_dict yield storage_dict
with open(self.cache_file, "w") as cache: # teardown with open(self.cache_file, "w") as cache: # teardown
json.dump(storage_dict, cache) json.dump(storage_dict, cache, indent=2)
class Connection: class Connection:
...@@ -174,7 +174,27 @@ class Connection: ...@@ -174,7 +174,27 @@ class Connection:
self.cache = Cache(cache_dir) self.cache = Cache(cache_dir)
self.cache_backup = Cache(cache_dir, "status_endpoints.old") self.cache_backup = Cache(cache_dir, "status_endpoints.old")
# max wait time is 30min # max wait time is 30min
self.wait_seconds = [minutes * 60 for minutes in (5, 5, 5, 5, 5, 5)] self.wait_seconds = []
def setRequestTimes(self, interval_min, maxWait_min):
"""set the intervals and maximum duration to wait, before requests to the analysis service are stopped
The waiting intervals determine how long and often the status endpoint is checked if the results are available.
As soon as the maximum waiting time is reached, the process is stopped. It can be restarted at any time, as the required endpoint is stored in a cache file
Parameters:
-----------
interval_min:
waiting interval in minutes
maxWait_min:
maximum duration to wait in minutes.
"""
if maxWait_min <=0:
raise RuntimeError(f"The maximum waiting time needs to be larger than 0min.")
elif interval_min <0 or interval_min > maxWait_min:
self.wait_seconds[0] = maxWait_min
else:
self.wait_seconds = [interval_min * 60 for _ in range(interval_min, maxWait_min+1, interval_min)]
def get(self, query_options : QueryOptions) -> requests.models.Response: def get(self, query_options : QueryOptions) -> requests.models.Response:
"""get results for a request. """get results for a request.
...@@ -262,20 +282,28 @@ class Connection: ...@@ -262,20 +282,28 @@ class Connection:
""" """
try: try:
response = self.wait_and_get(self.endpoint, asdict(query_options, dict_factory=quarryToDict)) response = self.wait_and_get(self.endpoint, asdict(query_options, dict_factory=quarryToDict))
url = response.history[0].url if response.history else response.url
if Connection.DEBUG:
print(f"[DEBUG] Original request: {url}")
if response.headers["Content-Type"] == "application/json":
status_endpoint = response.json()["status"]
#else:
# raise Exception( f"Unexpected type of response: {response.headers['Content-Type']}" )
#TODO: can this raise cause a problem?
response.raise_for_status()
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
print(f"A connection error occurred:") print(f"A connection error occurred:")
self.printExecption(e, response) self.printExecption(e, response)
raise e raise e
except requests.exceptions.ReadTimeout as e: except requests.exceptions.ReadTimeout as e:
print("Caught read timeout.") print("Caught read timeout.")
self.printExecption(e, response)
raise RuntimeError("Read timeout while querying for status endpoint") raise RuntimeError("Read timeout while querying for status endpoint")
try:
if response.headers["Content-Type"] == "application/json":
status_endpoint = response.json()["status"]
else:
raise Exception( f"Unexpected type of response: {response.headers['Content-Type']}" )
except: except:
raise RuntimeError(f"Request was not successful. Response by TOAR database: {response.text}") raise RuntimeError(f"Request was not successful. Response by TOAR database: {response.text}")
#we mage it: let's remember the status endpoint to get our results later:-)
self.cache.put(query_options.cache_key, status_endpoint) self.cache.put(query_options.cache_key, status_endpoint)
return status_endpoint return status_endpoint
...@@ -307,7 +335,10 @@ class Connection: ...@@ -307,7 +335,10 @@ class Connection:
print(f"Text: {e.response.text}") print(f"Text: {e.response.text}")
print(f"{response=}") print(f"{response=}")
print(f"{response.content=}") print(f"{response.content=}")
print(response.json()) try:
print(response.json())
except Exception as e:
print("Decoding as json failed.")
class AnalysisService: class AnalysisService:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment