Select Git revision
join.py 5.93 KiB
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import requests
import logging
import pandas as pd
import datetime as dt
from typing import Iterator, Union, List, Dict
from src import helpers
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
str_or_none = Union[str, None]
class EmptyQueryResult(Exception):
"""
Exception that get raised if a query to JOIN returns empty results.
"""
pass
def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
network_name: str = None) -> [pd.DataFrame, pd.DataFrame]:
"""
read data from JOIN/TOAR
:param station_name: Station name e.g. DEBY122
:param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:returns:
- df - data frame with all variables and statistics
- meta - data frame with all meta information
"""
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# load series information
vars_dict = load_series_information(station_name, station_type, network_name)
# download all variables with given statistic
data = None
df = None
for var in _lower_list(sorted(vars_dict.keys())):
if var in stat_var.keys():
logging.info('load: {}'.format(var))
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var],
'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
# load data
data = get_data(opts)
# correct namespace of statistics
stat = _correct_stat_name(stat_var[var])
# store data in pandas dataframe
df = _save_to_pandas(df, data, stat, var)
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
meta.columns = station_name
return df, meta
else:
raise EmptyQueryResult("No data found in JOIN.")
def get_data(opts: Dict) -> Union[Dict, List]:
"""
Download join data using requests framework. Data is returned as json like structure. Depending on the response
structure, this can lead to a list or dictionary.
:param opts: options to create the request url
:return: requested data (either as list or dictionary)
"""
url = create_url(**opts)
response = requests.get(url)
return response.json()
def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none) -> Dict:
"""
List all series ids that are available for given station id and network name.
:param station_name: Station name e.g. DEBW107
:param station_type: station type like "traffic" or "background"
:param network_name: measurement network of the station like "UBA" or "AIRBASE"
:return: all available series for requested station stored in an dictionary with parameter name (variable) as key
and the series id as value.
"""
opts = {"base": join_url_base, "service": "series", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name}
station_vars = get_data(opts)
vars_dict = {item[3].lower(): item[0] for item in station_vars}
return vars_dict
def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame:
"""
Save given data in data frame. If given data frame is not empty, the data is appened as new column.
:param df: data frame to append the new data, can be none
:param data: new data to append or format as data frame containing the keys 'datetime' and '<stat>'
:param stat: extracted statistic to get values from data (e.g. 'mean', 'dma8eu')
:param var: variable the data is from (e.g. 'o3')
:return: new created or concatenated data frame
"""
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
return df
def _correct_stat_name(stat: str) -> str:
"""
Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping
namespace.
:param stat: namespace from JOIN server
:return: stat mapped to local namespace
"""
mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
return mapping.get(stat, stat)
def _lower_list(args: List[str]) -> Iterator[str]:
"""
lower all elements of given list
:param args: list with string entries to lower
:return: iterator that lowers all list entries
"""
for string in args:
yield string.lower()
def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str:
"""
create a request url with given base url, service type and arbitrarily many additional keyword arguments
:param base: basic url of the rest service
:param service: service type, e.g. series, stats
:param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
:return: combined url as string
"""
if not base.endswith("/"):
base += "/"
url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
return url
if __name__ == "__main__":
var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
station = 'DEBW107'
download_join(station, var_all_dic)