Commit 386d8666 authored by lukas leufen's avatar lukas leufen 👻
Browse files

Merge branch 'lukas_issue396_feat_toardb-v2' into 'lukas_issue393_feat_era5-data'

Resolve "Load data from ToarDB V2"

See merge request !440
parents 8d3f92f1 9b59692a
Pipeline #105112 passed with stages
in 15 minutes and 35 seconds
......@@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3
pytest-metadata==1.11.0
pytest-sugar==0.9.4
tabulate==0.8.8
timezonefinder==5.2.0
wget==3.2
--no-binary shapely Shapely==1.7.0
......
......@@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3
pytest-metadata==1.11.0
pytest-sugar==0.9.4
tabulate==0.8.8
timezonefinder==5.2.0
wget==3.2
--no-binary shapely Shapely==1.7.0
......
......@@ -9,7 +9,6 @@ DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087']
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
DEFAULT_NETWORK = "AIRBASE"
DEFAULT_STATION_TYPE = "background"
DEFAULT_VARIABLES = DEFAULT_VAR_ALL_DICT.keys()
DEFAULT_START = "1997-01-01"
......
"""Settings to access https://toar-data.fz-juelich.de"""
from typing import Tuple, Dict
def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]:
"""
Set url for toar-data and required headers. Headers information is not required for now.
:param sampling: temporal resolution to access.
:return: Service url and optional headers
"""
if sampling == "daily": # pragma: no branch
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/"
headers = {}
elif sampling == "hourly" or sampling == "meta":
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/"
headers = {}
else:
raise NameError(f"Given sampling {sampling} is not supported, choose from either daily or hourly sampling.")
return TOAR_SERVICE_URL, headers
......@@ -63,8 +63,7 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation):
vars = [self.variables, self.target_var]
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
......@@ -147,8 +146,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi
stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind])
data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind],
self.station_type, self.network, self.store_data_locally, self.data_origin,
start, end)
self.store_data_locally, self.data_origin, start, end)
data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind],
limit=self.interpolation_limit[ind], sampling=self.sampling[ind])
return data
......
......@@ -20,8 +20,9 @@ import xarray as xr
from mlair.configuration import check_path_and_create
from mlair import helpers
from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5
from mlair.helpers import statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict
from mlair.data_handler.abstract_data_handler import AbstractDataHandler
from mlair.helpers import data_sources
# define a more general date type for type hinting
date = Union[dt.date, dt.datetime]
......@@ -38,8 +39,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
indicates that not all values up to t0 are used, a positive values indicates usage of values at t>t0. Default
is 0.
"""
DEFAULT_STATION_TYPE = "background"
DEFAULT_NETWORK = "AIRBASE"
DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
'pblheight': 'maximum'}
......@@ -58,12 +57,11 @@ class DataHandlerSingleStation(AbstractDataHandler):
chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane",
"so2", "toluene"]
_hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim",
"target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset",
"window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"]
_hash = ["station", "statistics_per_var", "data_origin", "sampling", "target_dim", "target_var", "time_dim",
"iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time",
"interpolation_limit", "interpolation_method", "variables", "window_history_end"]
def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE,
network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING,
def __init__(self, station, data_path, statistics_per_var=None, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING,
target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM,
iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM,
window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET,
......@@ -87,8 +85,6 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.input_data, self.target_data = None, None
self._transformation = self.setup_transformation(transformation)
self.station_type = station_type
self.network = network
self.sampling = sampling
self.target_dim = target_dim
self.target_var = target_var
......@@ -140,9 +136,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
return self._data.shape, self.get_X().shape, self.get_Y().shape
def __repr__(self):
return f"StationPrep(station={self.station}, data_path='{self.path}', " \
return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \
f"statistics_per_var={self.statistics_per_var}, " \
f"station_type='{self.station_type}', network='{self.network}', " \
f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \
f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \
f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \
......@@ -169,8 +164,12 @@ class DataHandlerSingleStation(AbstractDataHandler):
return self.get_transposed_label()
def get_coordinates(self):
coords = self.meta.loc[["station_lon", "station_lat"]].astype(float)
return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)]
try:
coords = self.meta.loc[["station_lon", "station_lat"]].astype(float)
coords = coords.rename(index={"station_lon": "lon", "station_lat": "lat"})
except KeyError:
coords = self.meta.loc[["lon", "lat"]].astype(float)
return coords.to_dict()[str(self)]
def call_transform(self, inverse=False):
opts_input = self._transformation[0]
......@@ -301,8 +300,7 @@ class DataHandlerSingleStation(AbstractDataHandler):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit, sampling=self.sampling)
self.set_inputs_and_targets()
......@@ -320,8 +318,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
self.make_observation(self.target_dim, self.target_var, self.time_dim)
self.remove_nan(self.time_dim)
def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None,
store_data_locally=False, data_origin: Dict = None, start=None, end=None):
def load_data(self, path, station, statistics_per_var, sampling, store_data_locally=False,
data_origin: Dict = None, start=None, end=None):
"""
Load data and meta data either from local disk (preferred) or download new data by using a custom download method.
......@@ -339,23 +337,20 @@ class DataHandlerSingleStation(AbstractDataHandler):
if os.path.exists(meta_file):
os.remove(meta_file)
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim)
logging.debug(f"loaded new data")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta(meta, station, station_type, network, data_origin)
self.check_station_meta(meta, station, data_origin, statistics_per_var)
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data")
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
......@@ -366,9 +361,8 @@ class DataHandlerSingleStation(AbstractDataHandler):
return data, meta
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling,
station_type=None, network=None, store_data_locally=True, data_origin: Dict = None,
time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \
-> [xr.DataArray, pd.DataFrame]:
store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM,
target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface or load local era5 data.
......@@ -381,31 +375,36 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: downloaded data and its meta data
"""
df_all = {}
df_era5, df_join = None, None
meta_era5, meta_join = None, None
df_era5, df_toar = None, None
meta_era5, meta_toar = None, None
if data_origin is not None:
era5_origin = filter_dict_by_value(data_origin, "era5", True)
era5_stats = select_from_dict(statistics_per_var, era5_origin.keys())
join_origin = filter_dict_by_value(data_origin, "era5", False)
join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False)
assert len(era5_origin) + len(join_origin) == len(data_origin)
assert len(era5_stats) + len(join_stats) == len(statistics_per_var)
toar_origin = filter_dict_by_value(data_origin, "era5", False)
toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False)
assert len(era5_origin) + len(toar_origin) == len(data_origin)
assert len(era5_stats) + len(toar_stats) == len(statistics_per_var)
else:
era5_origin, join_origin = None, None
era5_stats, join_stats = statistics_per_var, statistics_per_var
era5_origin, toar_origin = None, None
era5_stats, toar_stats = statistics_per_var, statistics_per_var
# load data
if era5_origin is not None and len(era5_stats) > 0:
# load era5 data
df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling,
data_origin=era5_origin)
if join_origin is None or len(join_stats) > 0:
# load join data
df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type,
network_name=network, sampling=sampling, data_origin=join_origin)
df = pd.concat([df_era5, df_join], axis=1, sort=True)
meta = meta_era5 if meta_era5 is not None else meta_join
df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats,
sampling=sampling, data_origin=era5_origin)
if toar_origin is None or len(toar_stats) > 0:
# load combined data from toar-data (v2 & v1)
df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats,
sampling=sampling, data_origin=toar_origin)
df = pd.concat([df_era5, df_toar], axis=1, sort=True)
if meta_era5 is not None and meta_toar is not None:
meta = meta_era5.combine_first(meta_toar)
else:
meta = meta_era5 if meta_era5 is not None else meta_toar
meta.loc["data_origin"] = str(data_origin)
meta.loc["statistics_per_var"] = str(statistics_per_var)
df_all[station[0]] = df
# convert df_all to xarray
......@@ -418,22 +417,21 @@ class DataHandlerSingleStation(AbstractDataHandler):
return xarr, meta
@staticmethod
def check_station_meta(meta, station, station_type, network, data_origin):
def check_station_meta(meta, station, data_origin, statistics_per_var):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if station_type is not None:
check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)}
for (k, v) in check_dict.items():
if v is None or k not in meta:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
check_dict = {"data_origin": str(data_origin), "statistics_per_var": str(statistics_per_var)}
for (k, v) in check_dict.items():
if v is None or k not in meta.index:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new "
f"grapping from web.")
raise FileNotFoundError
def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray:
"""
......
......@@ -68,8 +68,7 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation):
def make_input_target(self):
data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling,
self.station_type, self.network, self.store_data_locally, self.data_origin,
self.start, self.end)
self.store_data_locally, self.data_origin, self.start, self.end)
self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method,
limit=self.interpolation_limit)
self.set_inputs_and_targets()
......
......@@ -22,7 +22,7 @@ import xarray as xr
from mlair.data_handler.abstract_data_handler import AbstractDataHandler
from mlair.helpers import remove_items, to_list, TimeTrackingWrapper
from mlair.helpers.join import EmptyQueryResult
from mlair.helpers.data_sources.toar_data import EmptyQueryResult
number = Union[float, int]
......@@ -168,7 +168,7 @@ class DefaultDataHandler(AbstractDataHandler):
dim = self.time_dim
intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original))
if len(intersect) < max(self.min_length, 1):
X, Y = None, None
raise ValueError(f"There is no intersection of X.")
else:
X = list(map(lambda x: x.sel({dim: intersect}), X_original))
Y = Y_original.sel({dim: intersect})
......@@ -205,10 +205,6 @@ class DefaultDataHandler(AbstractDataHandler):
if True only extract values larger than extreme_values
:param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime
"""
# check if X or Y is None
if (self._X is None) or (self._Y is None):
logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes")
return
if extreme_values is None:
logging.debug(f"No extreme values given, skip multiply extremes")
self._X_extreme, self._Y_extreme = self._X, self._Y
......
"""
Data Sources.
The module data_sources collects different data sources, namely ERA5, TOAR-Data v1 (JOIN), and TOAR-Data v2
"""
__author__ = "Lukas Leufen"
__date__ = "2022-07-05"
from . import era5, join, toar_data, toar_data_v2
......@@ -5,14 +5,14 @@ __date__ = "2022-06-09"
import logging
import os
import numpy as np
import pandas as pd
import xarray as xr
from mlair import helpers
from mlair.configuration.era5_settings import era5_settings
from mlair.configuration.join_settings import join_settings
from mlair.helpers.join import load_meta_data, EmptyQueryResult
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone
from mlair.helpers.data_sources.toar_data import EmptyQueryResult
from mlair.helpers.meteo import relative_humidity_from_dewpoint
......@@ -30,15 +30,16 @@ def load_era5(station_name, stat_var, sampling, data_origin):
else:
raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
# get data connection settings
# load series information (lat/lon) from join database
join_url_base, headers = join_settings()
meta = load_meta_data(station_name, None, None, join_url_base, headers)
# load station meta using toar-data v2 API
meta_url_base, headers = toar_data_v2_settings("meta")
station_meta = load_station_information(station_name, meta_url_base, headers)
# sel data for station using sel method nearest
data = xr.open_mfdataset(os.path.join(data_path, file_names))
station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True)
station_data = station_dask.to_array().T.compute()
logging.info(f"load data for {station_meta['codes'][0]} from ERA5")
with xr.open_mfdataset(os.path.join(data_path, file_names)) as data:
lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"]
station_dask = data.sel(lon=lon, lat=lat, method="nearest", drop=True)
station_data = station_dask.to_array().T.compute()
# transform data and meta to pandas
station_data = station_data.to_pandas()
......@@ -55,10 +56,23 @@ def load_era5(station_name, stat_var, sampling, data_origin):
else:
station_data = station_data[stat_var]
meta = pd.DataFrame.from_dict(meta, orient="index", columns=station_name)
# convert to local timezone
station_data = correct_timezone(station_data, station_meta, sampling)
variable_meta = _emulate_meta_data(station_data)
meta = combine_meta_data(station_meta, variable_meta)
meta = pd.DataFrame.from_dict(meta, orient='index')
meta.columns = station_name
return station_data, meta
def _emulate_meta_data(station_data):
general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"}
roles_meta = {"roles": [{"contact": {"organisation": {"name": "ERA5", "longname": "ECMWF"}}}]}
variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.columns}
return variable_meta
def _rename_era5_variables(era5_names):
mapper = {"SP": "press", "U10M": "u", "V10M": "v", "T2M": "temp", "D2M": "dew", "BLH": "pblheight",
"TCC": "cloudcover", "RHw": "relhum"}
......
......@@ -4,36 +4,27 @@ __date__ = '2019-10-16'
import datetime as dt
import logging
from typing import Iterator, Union, List, Dict
from typing import Iterator, Union, List, Dict, Tuple
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from mlair import helpers
from mlair.configuration.join_settings import join_settings
from mlair.helpers.data_sources import toar_data, toar_data_v2
# join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
str_or_none = Union[str, None]
class EmptyQueryResult(Exception):
"""Exception that get raised if a query to JOIN returns empty results."""
pass
def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame,
pd.DataFrame]:
sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]:
"""
Read data from JOIN/TOAR.
:param station_name: Station name e.g. DEBY122
:param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
:param station_type: set the station type like "traffic" or "background", can be none
:param network_name: set the measurement network like "UBA" or "AIRBASE", can be none
:param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily)
:param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid
origins are "REA" for reanalysis data and "" (empty string) for observational data.
......@@ -43,25 +34,23 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# also ensure that given data_origin dict is no reference
if data_origin is None or len(data_origin) == 0:
data_origin = None
else:
data_origin = {k: v for (k, v) in data_origin.items()}
# split network and origin information
data_origin, network_name = split_network_and_origin(data_origin)
# get data connection settings
join_url_base, headers = join_settings(sampling)
# load series information
vars_dict, data_origin = load_series_information(station_name, station_type, network_name, join_url_base, headers,
data_origin)
data_origin, stat_var)
# check if all requested variables are available
if set(stat_var).issubset(vars_dict) is False:
missing_variables = set(stat_var).difference(vars_dict)
origin = helpers.select_from_dict(data_origin, missing_variables)
options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}"
raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.")
raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in "
f"JOIN.")
# correct stat_var values if data is not aggregated (hourly)
if sampling == "hourly":
......@@ -70,6 +59,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# download all variables with given statistic
data = None
df = None
meta = {}
logging.info(f"load data for {station_name[0]} from JOIN")
for var in _lower_list(sorted(vars_dict.keys())):
if var in stat_var.keys():
......@@ -81,7 +71,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
'sampling': sampling, 'capture': 0, 'format': 'json'}
# load data
data = get_data(opts, headers)
data = toar_data.get_data(opts, headers)
# adjust data format if given as list of list
# no branch cover because this just happens when downloading hourly data using a secret token, not available
......@@ -94,15 +84,94 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
# store data in pandas dataframe
df = _save_to_pandas(df, data, stat, var)
meta[var] = _correct_meta(data["metadata"])
logging.debug('finished: {}'.format(var))
if data:
meta = pd.DataFrame.from_dict(data['metadata'], orient='index')
# load station meta using toar-data v2 API and convert to local timezone
meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta")
station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers)
df = toar_data_v2.correct_timezone(df, station_meta, sampling)
# create meta data
meta = toar_data_v2.combine_meta_data(station_meta, meta)
meta = pd.DataFrame.from_dict(meta, orient='index')
meta.columns = station_name
return df, meta
else:
raise EmptyQueryResult("No data found in JOIN.")
raise toar_data.EmptyQueryResult("No data found in JOIN.")
def _correct_meta(meta):
meta_out = {}
for k, v in meta.items():
if k.startswith("station"):
_k = k.split("_", 1)[1]
_d = meta_out.get("station", {})
_d[_k] = v
meta_out["station"] = _d
elif k.startswith("parameter"):
_k = k.split("_", 1)[1]
_d = meta_out.get("variable", {})
_d[_k] = v
meta_out["variable"] = _d
elif k == "network_name":
if v == "AIRBASE":
_d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"}
elif v == "UBA":
_d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"}
else:
_d = {"name": v}
meta_out["roles"] = [{"contact": {"organisation": _d}}]
elif k in ["google_resolution", "numid"]:
continue
else:
meta_out[k] = v
return meta_out
def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]:
"""
Split given dict into network and data origin.
Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which
uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1).
"""
if origin_network_dict is None or len(origin_network_dict) == 0:
data_origin, network = None, None
else:
data_origin = {}
network = {}
for k, v in origin_network_dict.items():
network[k] = []
for _network in helpers.to_list(v):
if _network.lower() == "EEA".lower():
network[k].append("AIRBASE")
elif _network.lower() != "REA".lower():
network[k].append(_network)
if "REA" in v:
data_origin[k] = "REA"
else:
data_origin[k] = ""
network[k] = filter_network(network[k])
return data_origin, network
def filter_network(network: list) -> Union[list, None]:
"""
Filter given list of networks.
:param network: list of various network names (can contain duplicates)
:return: sorted list with unique entries