Select Git revision
maestro-schema-schema.yaml
join.py 3.90 KiB
__author__ = 'Felix Kleinert, Lukas Leufen'
__date__ = '2019-10-16'
import requests
import json
import logging
import pandas as pd
import datetime as dt
from typing import Iterator, Union, List
from src import helpers
join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
logging.basicConfig(level=logging.INFO)
def download_join(station_name: Union[str, List[str]], statvar: dict) -> [pd.DataFrame, pd.DataFrame]:
"""
read data from JOIN/TOAR
:param station_name: Station name e.g. DEBY122
:param statvar: key as variable like 'O3', values as statistics on keys like 'mean'
:returns:
- df - pandas df with all variables and statistics
- meta - pandas df with all meta information
"""
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# load series information
opts = {'base': join_url_base, 'service': 'series', 'station_id': station_name[0]}
url = create_url(**opts)
response = requests.get(url)
station_vars = response.json()
vars_dict = {item[3].lower(): item[0] for item in station_vars}
# download all variables with given statistic
data = None
df = None
for var in _lower_list(sorted(vars_dict.keys())):
if var in statvar.keys():
logging.info('load: {}'.format(var))
# create data link
opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': statvar[var],
'sampling': 'daily', 'capture': 0, 'min_data_length': 1460}
url = create_url(**opts)
# load data
response = requests.get(url)
data = response.json()
# correct namespace of statistics
stat = _correct_stat_name(statvar[var])
# store data in pandas dataframe
index = map(lambda s: dt.datetime.strptime(s, "%Y-%m-%d %H:%M"), data['datetime'])
if df is None:
df = pd.DataFrame(data[stat], index=index, columns=[var])
else:
df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
meta.columns = station_name
return df, meta
else:
raise ValueError("No data found in JOIN.")
def _correct_stat_name(stat: str) -> str:
"""
Map given statistic name to new namespace defined by mapping dict. Return given name stat if not element of mapping
namespace.
:param stat: namespace from JOIN server
:return: stat mapped to local namespace
"""
mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
return mapping.get(stat, stat)
def _lower_list(args: List[str]) -> Iterator[str]:
"""
lower all elements of given list
:param args: list with string entries to lower
:return: iterator that lowers all list entries
"""
for string in args:
yield string.lower()
def create_url(base: str, service: str, **kwargs: Union[str, int, float]) -> str:
"""
create a request url with given base url, service type and arbitrarily many additional keyword arguments
:param base: basic url of the rest service
:param service: service type, e.g. series, stats
:param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
:return: combined url as string
"""
url = '{}{}/?'.format(base, service) + '&'.join('{}={}'.format(k, v) for k, v in kwargs.items())
return url
if __name__ == "__main__":
var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
station = 'DEBW107'
download_join(station, var_all_dic)