diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index ebfac3cd0d989a8845f2a3fceba33d562b898b8d..6102da7bcc5f61ee1c2ea0d205125a6e5ee641ea 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index ebfac3cd0d989a8845f2a3fceba33d562b898b8d..6102da7bcc5f61ee1c2ea0d205125a6e5ee641ea 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index b630261dbf58d7402f8c3cacaee153347ad4f1e3..f3e0496120fa15e958e3d62e68f7d0915bbcf938 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -9,7 +9,6 @@ DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} -DEFAULT_NETWORK = "AIRBASE" DEFAULT_STATION_TYPE = "background" DEFAULT_VARIABLES = DEFAULT_VAR_ALL_DICT.keys() DEFAULT_START = "1997-01-01" diff --git a/mlair/configuration/toar_data_v2_settings.py b/mlair/configuration/toar_data_v2_settings.py new file mode 100644 index 0000000000000000000000000000000000000000..a8bb9f42cf5a1967f150aa18019c2dbdc89f43a2 --- /dev/null +++ b/mlair/configuration/toar_data_v2_settings.py @@ -0,0 +1,20 @@ +"""Settings to access https://toar-data.fz-juelich.de""" +from typing import Tuple, Dict + + +def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]: + """ + Set url for toar-data and required headers. Headers information is not required for now. + + :param sampling: temporal resolution to access. + :return: Service url and optional headers + """ + if sampling == "daily": # pragma: no branch + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/" + headers = {} + elif sampling == "hourly" or sampling == "meta": + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/" + headers = {} + else: + raise NameError(f"Given sampling {sampling} is not supported, choose from either daily or hourly sampling.") + return TOAR_SERVICE_URL, headers diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index c26f97cdd7ae43a7f6026801aef39435d517c428..eaa6a21175bd5f88c32c9c3cb74947c0cc0956a3 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -63,8 +63,7 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.store_data_locally, self.data_origin, self.start, self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) @@ -147,8 +146,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - start, end) + self.store_data_locally, self.data_origin, start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) return data diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 6d3407eefd9b4a96f9e73f5f7e21fead0369d37b..d23c9f2b03a63061b2501984f20f182514ffcf9f 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -20,8 +20,9 @@ import xarray as xr from mlair.configuration import check_path_and_create from mlair import helpers -from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5 +from mlair.helpers import statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict from mlair.data_handler.abstract_data_handler import AbstractDataHandler +from mlair.helpers import data_sources # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -38,8 +39,6 @@ class DataHandlerSingleStation(AbstractDataHandler): indicates that not all values up to t0 are used, a positive values indicates usage of values at t>t0. Default is 0. """ - DEFAULT_STATION_TYPE = "background" - DEFAULT_NETWORK = "AIRBASE" DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} @@ -58,12 +57,11 @@ class DataHandlerSingleStation(AbstractDataHandler): chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - _hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim", - "target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", - "window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"] + _hash = ["station", "statistics_per_var", "data_origin", "sampling", "target_dim", "target_var", "time_dim", + "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time", + "interpolation_limit", "interpolation_method", "variables", "window_history_end"] - def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE, - network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, + def __init__(self, station, data_path, statistics_per_var=None, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, @@ -87,8 +85,6 @@ class DataHandlerSingleStation(AbstractDataHandler): self.input_data, self.target_data = None, None self._transformation = self.setup_transformation(transformation) - self.station_type = station_type - self.network = network self.sampling = sampling self.target_dim = target_dim self.target_var = target_var @@ -140,9 +136,8 @@ class DataHandlerSingleStation(AbstractDataHandler): return self._data.shape, self.get_X().shape, self.get_Y().shape def __repr__(self): - return f"StationPrep(station={self.station}, data_path='{self.path}', " \ + return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \ f"statistics_per_var={self.statistics_per_var}, " \ - f"station_type='{self.station_type}', network='{self.network}', " \ f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \ f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \ f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \ @@ -169,8 +164,12 @@ class DataHandlerSingleStation(AbstractDataHandler): return self.get_transposed_label() def get_coordinates(self): - coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) - return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)] + try: + coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) + coords = coords.rename(index={"station_lon": "lon", "station_lat": "lat"}) + except KeyError: + coords = self.meta.loc[["lon", "lat"]].astype(float) + return coords.to_dict()[str(self)] def call_transform(self, inverse=False): opts_input = self._transformation[0] @@ -301,8 +300,7 @@ class DataHandlerSingleStation(AbstractDataHandler): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit, sampling=self.sampling) self.set_inputs_and_targets() @@ -320,8 +318,8 @@ class DataHandlerSingleStation(AbstractDataHandler): self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) - def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None, - store_data_locally=False, data_origin: Dict = None, start=None, end=None): + def load_data(self, path, station, statistics_per_var, sampling, store_data_locally=False, + data_origin: Dict = None, start=None, end=None): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -339,23 +337,20 @@ class DataHandlerSingleStation(AbstractDataHandler): if os.path.exists(meta_file): os.remove(meta_file) data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) + time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug(f"loaded new data") else: try: logging.debug(f"try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) - self.check_station_meta(meta, station, station_type, network, data_origin) + self.check_station_meta(meta, station, data_origin, statistics_per_var) logging.debug("loading finished") except FileNotFoundError as e: logging.debug(e) logging.debug(f"load new data") data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, store_data_locally=store_data_locally, data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) @@ -366,9 +361,8 @@ class DataHandlerSingleStation(AbstractDataHandler): return data, meta def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, - station_type=None, network=None, store_data_locally=True, data_origin: Dict = None, - time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \ - -> [xr.DataArray, pd.DataFrame]: + store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, + target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: """ Download data from TOAR database using the JOIN interface or load local era5 data. @@ -381,31 +375,36 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_join = None, None - meta_era5, meta_join = None, None + df_era5, df_toar = None, None + meta_era5, meta_toar = None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) - join_origin = filter_dict_by_value(data_origin, "era5", False) - join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) - assert len(era5_origin) + len(join_origin) == len(data_origin) - assert len(era5_stats) + len(join_stats) == len(statistics_per_var) + toar_origin = filter_dict_by_value(data_origin, "era5", False) + toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) + assert len(era5_origin) + len(toar_origin) == len(data_origin) + assert len(era5_stats) + len(toar_stats) == len(statistics_per_var) else: - era5_origin, join_origin = None, None - era5_stats, join_stats = statistics_per_var, statistics_per_var + era5_origin, toar_origin = None, None + era5_stats, toar_stats = statistics_per_var, statistics_per_var # load data if era5_origin is not None and len(era5_stats) > 0: # load era5 data - df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, - data_origin=era5_origin) - if join_origin is None or len(join_stats) > 0: - # load join data - df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type, - network_name=network, sampling=sampling, data_origin=join_origin) - df = pd.concat([df_era5, df_join], axis=1, sort=True) - meta = meta_era5 if meta_era5 is not None else meta_join + df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, + sampling=sampling, data_origin=era5_origin) + if toar_origin is None or len(toar_stats) > 0: + # load combined data from toar-data (v2 & v1) + df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, + sampling=sampling, data_origin=toar_origin) + + df = pd.concat([df_era5, df_toar], axis=1, sort=True) + if meta_era5 is not None and meta_toar is not None: + meta = meta_era5.combine_first(meta_toar) + else: + meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) + meta.loc["statistics_per_var"] = str(statistics_per_var) df_all[station[0]] = df # convert df_all to xarray @@ -418,22 +417,21 @@ class DataHandlerSingleStation(AbstractDataHandler): return xarr, meta @staticmethod - def check_station_meta(meta, station, station_type, network, data_origin): + def check_station_meta(meta, station, data_origin, statistics_per_var): """ Search for the entries in meta data and compare the value with the requested values. Will raise a FileNotFoundError if the values mismatch. """ - if station_type is not None: - check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)} - for (k, v) in check_dict.items(): - if v is None or k not in meta: - continue - if meta.at[k, station[0]] != v: - logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " - f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " - f"grapping from web.") - raise FileNotFoundError + check_dict = {"data_origin": str(data_origin), "statistics_per_var": str(statistics_per_var)} + for (k, v) in check_dict.items(): + if v is None or k not in meta.index: + continue + if meta.at[k, station[0]] != v: + logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " + f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " + f"grapping from web.") + raise FileNotFoundError def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: """ diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 47ccc5510c8135745c518611504cd02900a1f883..e5760e9afb52f9d55071214fb632601d744f124e 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -68,8 +68,7 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 300e0435c4e8441e299675319e2c72604ebb3200..ae46ad918630f2a5f083c62b558609e85d7cc2d8 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -22,7 +22,7 @@ import xarray as xr from mlair.data_handler.abstract_data_handler import AbstractDataHandler from mlair.helpers import remove_items, to_list, TimeTrackingWrapper -from mlair.helpers.join import EmptyQueryResult +from mlair.helpers.data_sources.toar_data import EmptyQueryResult number = Union[float, int] @@ -168,7 +168,7 @@ class DefaultDataHandler(AbstractDataHandler): dim = self.time_dim intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original)) if len(intersect) < max(self.min_length, 1): - X, Y = None, None + raise ValueError(f"There is no intersection of X.") else: X = list(map(lambda x: x.sel({dim: intersect}), X_original)) Y = Y_original.sel({dim: intersect}) @@ -205,10 +205,6 @@ class DefaultDataHandler(AbstractDataHandler): if True only extract values larger than extreme_values :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime """ - # check if X or Y is None - if (self._X is None) or (self._Y is None): - logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes") - return if extreme_values is None: logging.debug(f"No extreme values given, skip multiply extremes") self._X_extreme, self._Y_extreme = self._X, self._Y diff --git a/mlair/helpers/data_sources/__init__.py b/mlair/helpers/data_sources/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6b753bc3afb961be65ff0f934ef4f0de08804a0b --- /dev/null +++ b/mlair/helpers/data_sources/__init__.py @@ -0,0 +1,10 @@ +""" +Data Sources. + +The module data_sources collects different data sources, namely ERA5, TOAR-Data v1 (JOIN), and TOAR-Data v2 +""" + +__author__ = "Lukas Leufen" +__date__ = "2022-07-05" + +from . import era5, join, toar_data, toar_data_v2 diff --git a/mlair/helpers/era5.py b/mlair/helpers/data_sources/era5.py similarity index 57% rename from mlair/helpers/era5.py rename to mlair/helpers/data_sources/era5.py index 8cf967a2c0ae65ff76d9464433266c2fdfc6ef4f..cd569f3a77871a2566c8962680122b8fc7faed42 100644 --- a/mlair/helpers/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -5,14 +5,14 @@ __date__ = "2022-06-09" import logging import os -import numpy as np import pandas as pd import xarray as xr from mlair import helpers from mlair.configuration.era5_settings import era5_settings -from mlair.configuration.join_settings import join_settings -from mlair.helpers.join import load_meta_data, EmptyQueryResult +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone +from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint @@ -30,15 +30,16 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") - # get data connection settings - # load series information (lat/lon) from join database - join_url_base, headers = join_settings() - meta = load_meta_data(station_name, None, None, join_url_base, headers) + # load station meta using toar-data v2 API + meta_url_base, headers = toar_data_v2_settings("meta") + station_meta = load_station_information(station_name, meta_url_base, headers) # sel data for station using sel method nearest - data = xr.open_mfdataset(os.path.join(data_path, file_names)) - station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True) - station_data = station_dask.to_array().T.compute() + logging.info(f"load data for {station_meta['codes'][0]} from ERA5") + with xr.open_mfdataset(os.path.join(data_path, file_names)) as data: + lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] + station_dask = data.sel(lon=lon, lat=lat, method="nearest", drop=True) + station_data = station_dask.to_array().T.compute() # transform data and meta to pandas station_data = station_data.to_pandas() @@ -55,10 +56,23 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: station_data = station_data[stat_var] - meta = pd.DataFrame.from_dict(meta, orient="index", columns=station_name) + # convert to local timezone + station_data = correct_timezone(station_data, station_meta, sampling) + + variable_meta = _emulate_meta_data(station_data) + meta = combine_meta_data(station_meta, variable_meta) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name return station_data, meta +def _emulate_meta_data(station_data): + general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} + roles_meta = {"roles": [{"contact": {"organisation": {"name": "ERA5", "longname": "ECMWF"}}}]} + variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.columns} + return variable_meta + + def _rename_era5_variables(era5_names): mapper = {"SP": "press", "U10M": "u", "V10M": "v", "T2M": "temp", "D2M": "dew", "BLH": "pblheight", "TCC": "cloudcover", "RHw": "relhum"} diff --git a/mlair/helpers/join.py b/mlair/helpers/data_sources/join.py similarity index 66% rename from mlair/helpers/join.py rename to mlair/helpers/data_sources/join.py index 9c020b39e1b16b8d6682d61c160cbba12c067221..df3b358ed187d4c6460f3d40b7ec20599526b86d 100644 --- a/mlair/helpers/join.py +++ b/mlair/helpers/data_sources/join.py @@ -4,36 +4,27 @@ __date__ = '2019-10-16' import datetime as dt import logging -from typing import Iterator, Union, List, Dict +from typing import Iterator, Union, List, Dict, Tuple import pandas as pd -import requests -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry from mlair import helpers from mlair.configuration.join_settings import join_settings +from mlair.helpers.data_sources import toar_data, toar_data_v2 + # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' str_or_none = Union[str, None] -class EmptyQueryResult(Exception): - """Exception that get raised if a query to JOIN returns empty results.""" - - pass - - def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, - network_name: str = None, sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, - pd.DataFrame]: + sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]: """ Read data from JOIN/TOAR. :param station_name: Station name e.g. DEBY122 :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean' :param station_type: set the station type like "traffic" or "background", can be none - :param network_name: set the measurement network like "UBA" or "AIRBASE", can be none :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid origins are "REA" for reanalysis data and "" (empty string) for observational data. @@ -43,25 +34,23 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # make sure station_name parameter is a list station_name = helpers.to_list(station_name) - # also ensure that given data_origin dict is no reference - if data_origin is None or len(data_origin) == 0: - data_origin = None - else: - data_origin = {k: v for (k, v) in data_origin.items()} + # split network and origin information + data_origin, network_name = split_network_and_origin(data_origin) # get data connection settings join_url_base, headers = join_settings(sampling) # load series information vars_dict, data_origin = load_series_information(station_name, station_type, network_name, join_url_base, headers, - data_origin) + data_origin, stat_var) # check if all requested variables are available if set(stat_var).issubset(vars_dict) is False: missing_variables = set(stat_var).difference(vars_dict) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}" - raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") + raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in " + f"JOIN.") # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -70,6 +59,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # download all variables with given statistic data = None df = None + meta = {} logging.info(f"load data for {station_name[0]} from JOIN") for var in _lower_list(sorted(vars_dict.keys())): if var in stat_var.keys(): @@ -81,7 +71,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t 'sampling': sampling, 'capture': 0, 'format': 'json'} # load data - data = get_data(opts, headers) + data = toar_data.get_data(opts, headers) # adjust data format if given as list of list # no branch cover because this just happens when downloading hourly data using a secret token, not available @@ -94,15 +84,94 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # store data in pandas dataframe df = _save_to_pandas(df, data, stat, var) + meta[var] = _correct_meta(data["metadata"]) logging.debug('finished: {}'.format(var)) if data: - meta = pd.DataFrame.from_dict(data['metadata'], orient='index') + # load station meta using toar-data v2 API and convert to local timezone + meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta") + station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers) + df = toar_data_v2.correct_timezone(df, station_meta, sampling) + + # create meta data + meta = toar_data_v2.combine_meta_data(station_meta, meta) + meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name return df, meta else: - raise EmptyQueryResult("No data found in JOIN.") + raise toar_data.EmptyQueryResult("No data found in JOIN.") + + +def _correct_meta(meta): + meta_out = {} + for k, v in meta.items(): + if k.startswith("station"): + _k = k.split("_", 1)[1] + _d = meta_out.get("station", {}) + _d[_k] = v + meta_out["station"] = _d + elif k.startswith("parameter"): + _k = k.split("_", 1)[1] + _d = meta_out.get("variable", {}) + _d[_k] = v + meta_out["variable"] = _d + elif k == "network_name": + if v == "AIRBASE": + _d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"} + elif v == "UBA": + _d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"} + else: + _d = {"name": v} + meta_out["roles"] = [{"contact": {"organisation": _d}}] + elif k in ["google_resolution", "numid"]: + continue + else: + meta_out[k] = v + return meta_out + + +def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: + """ + Split given dict into network and data origin. + + Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which + uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1). + """ + if origin_network_dict is None or len(origin_network_dict) == 0: + data_origin, network = None, None + else: + data_origin = {} + network = {} + for k, v in origin_network_dict.items(): + network[k] = [] + for _network in helpers.to_list(v): + if _network.lower() == "EEA".lower(): + network[k].append("AIRBASE") + elif _network.lower() != "REA".lower(): + network[k].append(_network) + if "REA" in v: + data_origin[k] = "REA" + else: + data_origin[k] = "" + network[k] = filter_network(network[k]) + return data_origin, network + + +def filter_network(network: list) -> Union[list, None]: + """ + Filter given list of networks. + + :param network: list of various network names (can contain duplicates) + :return: sorted list with unique entries + """ + sorted_network = [] + for v in list(filter(lambda x: x != "", network)): + if v not in sorted_network: + sorted_network.append(v) + if len(sorted_network) == 0: + sorted_network = None + return sorted_network def correct_data_format(data): @@ -125,57 +194,8 @@ def correct_data_format(data): return formatted -def get_data(opts: Dict, headers: Dict) -> Union[Dict, List]: - """ - Download join data using requests framework. - - Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. - - :param opts: options to create the request url - :param headers: additional headers information like authorization, can be empty - - :return: requested data (either as list or dictionary) - """ - url = create_url(**opts) - response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) - if response.status_code == 200: - return response.json() - else: - raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") - - -def retries_session(max_retries=3): - retry_strategy = Retry(total=max_retries, - backoff_factor=0.1, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=["HEAD", "GET", "OPTIONS"]) - adapter = HTTPAdapter(max_retries=retry_strategy) - http = requests.Session() - http.mount("https://", adapter) - http.mount("http://", adapter) - return http - - -def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict) -> [Dict, Dict]: - opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name, "as_dict": "true", - "columns": "station_id,network_name,station_local_id,station_type,station_type_of_area,station_category," - "station_name,station_country,station_state,station_lon,station_lat,station_alt," - "station_timezone,station_nightlight_5km,station_climatic_zone,station_wheat_production," - "station_rice_production,station_nox_emissions,station_omi_no2_column,station_toar_category," - "station_htap_region,station_reported_alt,station_alt_flag,station_coordinate_status," - "station_google_alt,station_etopo_alt,station_etopo_min_alt_5km,station_etopo_relative_alt," - "station_dominant_landcover,station_landcover_description,station_max_nightlight_25km," - "station_max_population_density_25km,station_nightlight_1km,station_population_density," - "google_resolution,station_comments,station_max_population_density_5km"} - if network_name is None: - opts["columns"] = opts["columns"].replace(",network_name", "") - return get_data(opts, headers)[-1] - - def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict, data_origin: Dict = None) -> [Dict, Dict]: + join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]: """ List all series ids that are available for given station id and network name. @@ -189,24 +209,49 @@ def load_series_information(station_name: List[str], station_type: str_or_none, :return: all available series for requested station stored in an dictionary with parameter name (variable) as key and the series id as value. """ - network_name_opts = network_name if network_name is None else ",".join(helpers.to_list(network_name)) + network_name_opts = _create_network_name_opts(network_name) + parameter_name_opts = _create_parameter_name_opts(stat_var) opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name_opts, "as_dict": "true", + "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts, "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} - station_vars = get_data(opts, headers) + station_vars = toar_data.get_data(opts, headers) logging.debug(f"{station_name}: {station_vars}") return _select_distinct_series(station_vars, data_origin, network_name) -def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) -> \ - [Dict, Dict]: +def _create_parameter_name_opts(stat_var): + if stat_var is None: + parameter_name_opts = None + else: + parameter_name_opts = ",".join(stat_var.keys()) + return parameter_name_opts + + +def _create_network_name_opts(network_name): + if network_name is None: + network_name_opts = network_name + elif isinstance(network_name, list): + network_name_opts = ",".join(helpers.to_list(network_name)) + elif isinstance(network_name, dict): + _network = [] + for v in network_name.values(): + _network.extend(helpers.to_list(v)) + network_name_opts = ",".join(filter(lambda x: x is not None, set(_network))) + network_name_opts = None if len(network_name_opts) == 0 else network_name_opts + else: + raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.") + return network_name_opts + + +def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) \ + -> [Dict, Dict]: """ Select distinct series ids for all variables. Also check if a parameter is from REA or not. """ data_origin = {} if data_origin is None else data_origin selected, data_origin = _select_distinct_data_origin(vars, data_origin) - network_name = [] if network_name is None else helpers.to_list(network_name) + network_name = [] if network_name is None else network_name selected = _select_distinct_network(selected, network_name) # extract id @@ -214,7 +259,7 @@ def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_ return selected, data_origin -def _select_distinct_network(vars: dict, network_name: list) -> dict: +def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict: """ Select distinct series regarding network name. The order the network names are provided in parameter `network_name` indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is @@ -227,20 +272,23 @@ def _select_distinct_network(vars: dict, network_name: list) -> dict: indicating to use always first candidate for each variable. :return: dictionary with single series reference for each variable """ + if isinstance(network_name, (list, str)): + network_name = {var: helpers.to_list(network_name) for var in vars.keys()} selected = {} for var, series in vars.items(): res = [] - for network in network_name: + network_list = helpers.to_list(network_name.get(var, []) or []) + for network in network_list: res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series))) if len(res) > 0: # use first match which has the highest priority selected[var] = res[0] else: - if len(network_name) == 0: # just print message which network is used if none is provided + if len(network_list) == 0: # just print message which network is used if none is provided selected[var] = series[0] - logging.info(f"Could not find a valid match for variable {var} and networks {network_name}! " - f"Therefore, use first answer from JOIN: {series[0]}") + logging.info(f"Could not find a valid match for variable {var} and networks {network_name.get(var, [])}" + f"! Therefore, use first answer from JOIN: {series[0]}") else: # raise error if network name is provided but no match could be found - raise ValueError(f"Cannot find a valid match for requested networks {network_name} and " + raise ValueError(f"Cannot find a valid match for requested networks {network_name.get(var, [])} and " f"variable {var} as only following networks are available in JOIN: " f"{list(map(lambda x: x['network_name'], series))}") return selected @@ -322,22 +370,6 @@ def _lower_list(args: List[str]) -> Iterator[str]: yield string.lower() -def create_url(base: str, service: str, **kwargs: Union[str, int, float, None]) -> str: - """ - Create a request url with given base url, service type and arbitrarily many additional keyword arguments. - - :param base: basic url of the rest service - :param service: service type, e.g. series, stats - :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' - - :return: combined url as string - """ - if not base.endswith("/"): - base += "/" - url = f"{base}{service}/?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" - return url - - if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py new file mode 100644 index 0000000000000000000000000000000000000000..9f9c595dac8b6b33d2d656519107baee3f648a8c --- /dev/null +++ b/mlair/helpers/data_sources/toar_data.py @@ -0,0 +1,111 @@ +__author__ = "Lukas Leufen" +__date__ = "2022-07-05" + + +from typing import Union, List, Dict + +from . import join, toar_data_v2 + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry +import pandas as pd + + +class EmptyQueryResult(Exception): + """Exception that get raised if a query to JOIN returns empty results.""" + + pass + + +def create_url(base: str, service: str, param_id: Union[str, int, None] = None, + **kwargs: Union[str, int, float, None]) -> str: + """ + Create a request url with given base url, service type and arbitrarily many additional keyword arguments. + + :param base: basic url of the rest service + :param service: service type, e.g. series, stats + :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs + :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' + + :return: combined url as string + """ + url = f"{base}" + if not url.endswith("/"): + url += "/" + if service is not None: + url = f"{url}{service}" + if not url.endswith("/"): + url += "/" + if param_id is not None: + url = f"{url}{param_id}" + if len(kwargs) > 0: + url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" + return url + + +def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: + """ + Download join data using requests framework. + + Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. + + :param opts: options to create the request url + :param headers: additional headers information like authorization, can be empty + :param as_json: extract response as json if true (default True) + + :return: requested data (either as list or dictionary) + """ + url = create_url(**opts) + response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) + if response.status_code == 200: + return response.json() if as_json is True else response.text + else: + raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") + + +def retries_session(max_retries=3): + retry_strategy = Retry(total=max_retries, + backoff_factor=0.1, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["HEAD", "GET", "OPTIONS"]) + adapter = HTTPAdapter(max_retries=retry_strategy) + http = requests.Session() + http.mount("https://", adapter) + http.mount("http://", adapter) + return http + + +def download_toar(station, toar_stats, sampling, data_origin): + + try: + # load data from toar-data (v2) + df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) + except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): + df_toar, meta_toar = None, None + + try: + # load join data (toar-data v1) + df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + data_origin=data_origin) + except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): + df_join, meta_join = None, None + + # merge both data sources with priority on toar-data v2 + if df_toar is not None and df_join is not None: + df_merged = merge_toar_join(df_toar, df_join, sampling) + meta_merged = meta_toar + else: + df_merged = df_toar if df_toar is not None else df_join + meta_merged = meta_toar if df_toar is not None else meta_join + return df_merged, meta_merged + + +def merge_toar_join(df_toar, df_join, sampling): + start_date = min([df_toar.index.min(), df_join.index.min()]) + end_date = max([df_toar.index.max(), df_join.index.max()]) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = df_toar.reindex(full_time) + full_data.update(df_join, overwrite=False) + return full_data diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..a592d2d16e6873ae8dfcef9b27698aaeca35ccb4 --- /dev/null +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -0,0 +1,238 @@ +"""Functions to access https://toar-data.fz-juelich.de/api/v2/""" +__author__ = 'Lukas Leufen' +__date__ = '2022-06-30' + + +import logging +from typing import Union, List, Dict +from io import StringIO + +import pandas as pd +import pytz +from timezonefinder import TimezoneFinder + +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers import to_list +from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data +from mlair.helpers.data_sources import join + + +str_or_none = Union[str, None] + + +def download_toar(station_name: Union[str, List[str]], stat_var: dict, + sampling: str = "daily", data_origin: Dict = None): + """ + Download data from https://toar-data.fz-juelich.de/api/v2/ + + Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is + given, this method tries to load time series for this origin. In case no origin is provided, this method loads data + with the highest priority according to toar-data's order parameter. + + :param station_name: + :param stat_var: + :param sampling: + :param data_origin: + :return: + """ + + # make sure station_name parameter is a list + station_name = to_list(station_name) + + # also ensure that given data_origin dict is no reference + if data_origin is None or len(data_origin) == 0: + data_origin = None + else: + data_origin = {k: v for (k, v) in data_origin.items()} + + # get data connection settings for meta + meta_url_base, headers = toar_data_v2_settings("meta") + + # load variables + var_meta = load_variables_information(stat_var, meta_url_base, headers) + + # load station meta + station_meta = load_station_information(station_name, meta_url_base, headers) + + # load series information + timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin) + + # # correct stat_var values if data is not aggregated (hourly) + # if sampling == "hourly": + # stat_var = {key: "values" for key in stat_var.keys()} + + logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA") + # get data connection settings for data + data_url_base, headers = toar_data_v2_settings(sampling) + + data_dict = {} + for var, meta in timeseries_meta.items(): + logging.debug(f"load {var}") + meta_and_opts = prepare_meta(meta, sampling, stat_var, var) + data_var = [] + for var_meta, opts in meta_and_opts: + data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) + data_dict[var] = merge_data(*data_var, sampling=sampling) + data = pd.DataFrame.from_dict(data_dict) + data = correct_timezone(data, station_meta, sampling) + + meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name + return data, meta + + +def merge_data(*args, sampling="hourly"): + start_date = min(map(lambda x: x.index.min(), args)) + end_date = max(map(lambda x: x.index.max(), args)) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = args[0].reindex(full_time) + if not isinstance(full_data, pd.DataFrame): + full_data = full_data.to_frame() + for d in args[1:]: + full_data.update(d, overwrite=False) + return full_data.squeeze() + + +def correct_timezone(data, meta, sampling): + """ + Extract timezone information and convert data index to this timezone. + + Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all + other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. + Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. + """ + if sampling == "hourly": + tz_info = meta.get("timezone", "UTC") + try: + tz = pytz.timezone(tz_info) + except pytz.exceptions.UnknownTimeZoneError as e: + lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] + tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) + index = data.index + index = index.tz_localize(None) + utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) + data.index = index + utc_offset + return data + + +def prepare_meta(meta, sampling, stat_var, var): + out = [] + for m in meta: + opts = {} + if sampling == "daily": + opts["timeseries_id"] = m.pop("id") + m["id"] = None + opts["names"] = stat_var[var] + opts["sampling"] = sampling + out.append(([m], opts)) + return out + + +def combine_meta_data(station_meta, timeseries_meta): + meta = {} + for k, v in station_meta.items(): + if k == "codes": + meta[k] = v[0] + elif k in ["coordinates", "additional_metadata", "globalmeta"]: + for _key, _val in v.items(): + if _key == "lng": + meta["lon"] = _val + else: + meta[_key] = _val + elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: + continue + else: + meta[k] = v + for var, var_meta in timeseries_meta.items(): + for k, v in var_meta.items(): + if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: + continue + elif k == "roles": + for _key, _val in v[0]["contact"]["organisation"].items(): + new_k = f"{var}_organisation_{_key}" + meta[new_k] = _val + elif k == "variable": + for _key, _val in v.items(): + new_k = f"{var}_{_key}" + meta[new_k] = _val + else: + new_k = f"{var}_{k}" + meta[new_k] = v + return meta + + +def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): + coll = [] + for meta in timeseries_meta: + series_id = meta["id"] + # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} + opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} + if sampling != "hourly": + opts["service"] = None + res = get_data(opts, headers, as_json=False) + data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, + infer_datetime_format=True) + data = data[join._correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) + coll.append(data) + return coll + + +def load_station_information(station_name: List[str], url_base: str, headers: Dict): + # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} + opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} + return get_data(opts, headers) + + +def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict, + data_origin: Dict = None) -> [Dict, Dict]: + timeseries_id_dict = {} + missing = [] + for var, meta in var_meta.items(): + timeseries_id_dict[var] = [] + opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]} + res = get_data(opts, headers) + if len(res) == 0: + missing.append((var, meta)) + # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).") + if data_origin is not None: + var_origin = data_origin[var] + timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin) + # if len(timeseries_id_dict[var]) == 0: + # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) " + # f"and timeseries origin {var_origin}.") + if data_origin is None or len(timeseries_id_dict[var]) == 0: + timeseries_id_dict[var] = select_timeseries_by_order(res) + if len(missing) > 0: + missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing]) + raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " + f"({station_meta['codes'][0]}) and variables {missing}.") + return timeseries_id_dict + + +def select_timeseries_by_order(toar_meta): + order_dict = {meta["order"]: meta for meta in toar_meta} + res = [order_dict[order] for order in sorted(order_dict.keys())] + return res + + +def select_timeseries_by_origin(toar_meta, var_origin): + res = [] + for origin in to_list(var_origin): + for meta in toar_meta: + for roles in meta["roles"]: + if roles["contact"]["organisation"]["name"].lower() == origin.lower(): + res.append(meta) + break + return res + + +def load_variables_information(var_dict, url_base, headers): + var_meta_dict = {} + for var in var_dict.keys(): + opts = {"base": url_base, "service": f"variables", "param_id": var} + var_meta_dict[var] = get_data(opts, headers) + return var_meta_dict diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 247c4fc9c7c6d57d721c1d0895cc8c719b1bd4a5..5fc3df951ed5dec9e94ed7d34d8dc02bafddf262 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -214,6 +214,7 @@ class ClimateFIRFilter(FIRFilter): h = [] if self.sel_opts is not None: self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} + self._check_sel_opts() sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) logging.debug(f"{self.display_name}: create diurnal_anomalies") if self.apriori_diurnal is True and sampling == "1H": @@ -303,6 +304,10 @@ class ClimateFIRFilter(FIRFilter): except Exception as e: logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") + def _check_sel_opts(self): + if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0: + raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data") + @staticmethod def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: next_order = 0 diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index d807db14c96a4a30fde791e54c8b1b32e519fb9c..5e7efde64dda069c57e2b7bb63faa8064f65a57d 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -10,7 +10,7 @@ from dill.source import getsource from mlair.configuration import path_config from mlair import helpers -from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_NETWORK, DEFAULT_STATION_TYPE, \ +from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_STATION_TYPE, \ DEFAULT_START, DEFAULT_END, DEFAULT_WINDOW_HISTORY_SIZE, DEFAULT_OVERWRITE_LOCAL_DATA, \ DEFAULT_HPC_LOGIN_LIST, DEFAULT_HPC_HOST_LIST, DEFAULT_CREATE_NEW_MODEL, DEFAULT_TRAIN_MODEL, \ DEFAULT_FRACTION_OF_TRAINING, DEFAULT_EXTREME_VALUES, DEFAULT_EXTREMES_ON_RIGHT_TAIL_ONLY, DEFAULT_PERMUTE_DATA, \ diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 00d82f3c6f48c3560e31d62b5bed4ddbd2bc49be..7dc61f89967b5992434e948fe2ba1fc37a4b651b 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -13,6 +13,7 @@ from typing import Dict, Tuple, Union, List, Callable import numpy as np import pandas as pd import xarray as xr +import datetime as dt from mlair.configuration import path_config from mlair.data_handler import Bootstraps, KerasIterator @@ -261,11 +262,17 @@ class PostProcessing(RunEnvironment): """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped.""" start_data = data.coords[dim].values[0] freq = {"daily": "1D", "hourly": "1H"}.get(sampling) - datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq)) + _ind = pd.date_range(start, end, freq=freq) # two steps required to include all hours of end interval + datetime_index = pd.DataFrame(index=pd.date_range(_ind.min(), _ind.max() + dt.timedelta(days=1), closed="left", + freq=freq)) t = data.sel({dim: start_data}, drop=True) res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords]) res = res.transpose(*data.dims) - res.loc[data.coords] = data + if data.shape == res.shape: + res.loc[data.coords] = data + else: + _d = data.sel({dim: slice(start, end)}) + res.loc[_d.coords] = _d return res def load_competitors(self, station_name: str) -> xr.DataArray: @@ -761,6 +768,7 @@ class PostProcessing(RunEnvironment): indicated by `station_name`. The name of the competitor is set in the `type` axis as indicator. This method will raise either a `FileNotFoundError` or `KeyError` if no competitor could be found for the given station. Either there is no file provided in the expected path or no forecast for given `competitor_name` in the forecast file. + Forecast is trimmed on interval start and end of test subset. :param station_name: name of the station to load data for :param competitor_name: name of the model @@ -769,10 +777,12 @@ class PostProcessing(RunEnvironment): path = os.path.join(self.competitor_path, competitor_name) file = os.path.join(path, f"forecasts_{station_name}_test.nc") with xr.open_dataarray(file) as da: - data = da.load() + data = da.load() forecast = data.sel(type=[self.forecast_indicator]) forecast.coords[self.model_type_dim] = [competitor_name] - return forecast + # limit forecast to time range of test subset + start, end = self.data_store.get("start", "test"), self.data_store.get("end", "test") + return self.create_full_time_dim(forecast, self.index_dim, self._sampling, start, end) def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray: """ diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 0e416acbca4d66d5844e1179c7653ac5a9934f28..7fb272f642aa0d7033cc1f28d4779dd6ad76e66e 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -18,7 +18,7 @@ import pandas as pd from mlair.data_handler import DataCollection, AbstractDataHandler from mlair.helpers import TimeTracking, to_list, tables from mlair.configuration import path_config -from mlair.helpers.join import EmptyQueryResult +from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.run_modules.run_environment import RunEnvironment @@ -114,8 +114,8 @@ class PreProcessing(RunEnvironment): +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ """ - meta_cols = ['station_name', 'station_lon', 'station_lat', 'station_alt'] - meta_round = ["station_lon", "station_lat", "station_alt"] + meta_cols = ["name", "lat", "lon", "alt", "country", "state", "type", "type_of_area", "toar1_category"] + meta_round = ["lat", "lon", "alt"] precision = 4 path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(path) @@ -402,8 +402,9 @@ def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_pat def f_proc_create_info_df(data, meta_cols): station_name = str(data.id_class) + meta = data.id_class.meta res = {"station_name": station_name, "Y_shape": data.get_Y()[0].shape[0], - "meta": data.id_class.meta.loc[meta_cols].values.flatten()} + "meta": meta.reindex(meta_cols).values.flatten()} return res diff --git a/requirements.txt b/requirements.txt index 3afc17b67fddbf5a269df1e1b7e103045630a290..353c0092237ef30ef21c3c225e899b8e21a823f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ six==1.15.0 statsmodels==0.12.2 tabulate==0.8.9 tensorflow==2.5.0 +timezonefinder==5.2.0 toolz==0.11.2 typing_extensions==3.7.4.3 wget==3.2 diff --git a/test/test_configuration/test_defaults.py b/test/test_configuration/test_defaults.py index 07a5aa2f543b1992baf10421de4b28133feb0eac..b46590290eff09ac98d549c7d38010eb5506d09c 100644 --- a/test/test_configuration/test_defaults.py +++ b/test/test_configuration/test_defaults.py @@ -31,7 +31,6 @@ class TestAllDefaults: 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} - assert DEFAULT_NETWORK == "AIRBASE" assert DEFAULT_STATION_TYPE == "background" assert DEFAULT_VARIABLES == DEFAULT_VAR_ALL_DICT.keys() assert DEFAULT_START == "1997-01-01" diff --git a/test/test_helpers/test_join.py b/test/test_helpers/test_data_sources/test_join.py similarity index 91% rename from test/test_helpers/test_join.py rename to test/test_helpers/test_data_sources/test_join.py index c309b26f597a812d7296872ee4f7c4c9f0baffea..0a9715f5dba725dc7f8fa184021b925857c707eb 100644 --- a/test/test_helpers/test_join.py +++ b/test/test_helpers/test_data_sources/test_join.py @@ -2,11 +2,12 @@ from typing import Iterable import pytest -from mlair.helpers.join import * -from mlair.helpers.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series, \ +from mlair.helpers.data_sources.join import * +from mlair.helpers.data_sources.join import _save_to_pandas, _correct_stat_name, _lower_list, _select_distinct_series, \ _select_distinct_data_origin, _select_distinct_network from mlair.configuration.join_settings import join_settings from mlair.helpers.testing import check_nested_equality +from mlair.helpers.data_sources.toar_data import EmptyQueryResult class TestDownloadJoin: @@ -30,7 +31,7 @@ class TestDownloadJoin: with pytest.raises(EmptyQueryResult) as e: download_join("DEBW107", {"o3": "dma8eu", "o10": "maximum"}, "background", data_origin={"o10": ""}) assert e.value.args[-1] == "No data found for variables {'o10'} and options station=['DEBW107'], " \ - "type=background, network=None, origin={'o10': ''} in JOIN." + "type=background, network={'o10': None}, origin={'o10': ''} in JOIN." class TestCorrectDataFormat: @@ -46,14 +47,6 @@ class TestCorrectDataFormat: "metadata": {"station": "test_station_001", "author": "ME", "success": True}} -class TestGetData: - - def test(self): - opts = {"base": join_settings()[0], "service": "series", "station_id": 'DEBW107', "network_name": "UBA", - "parameter_name": "o3,no2"} - assert get_data(opts, headers={}) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] - - class TestLoadSeriesInformation: def test_standard_query(self): @@ -185,8 +178,8 @@ class TestSelectDistinctNetwork: def test_single_network_given_no_match(self, vars): with pytest.raises(ValueError) as e: # AIRBASE not avail for all variables _select_distinct_network(vars, ["AIRBASE"]) - assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable " \ - "no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable no2 as " \ + "only following networks are available in JOIN: ['UBA']" with pytest.raises(ValueError) as e: # both requested networks are not available for all variables _select_distinct_network(vars, ["LUBW", "EMEP"]) @@ -222,6 +215,22 @@ class TestSelectDistinctNetwork: 'parameter_name': 'press', 'parameter_label': 'PRESS', 'parameter_attribute': 'REA'}} assert check_nested_equality(res, expected) is True + def test_multiple_networks_given_by_dict(self, vars): + res = _select_distinct_network(vars, {"no2": "UBA", "o3": ["UBA", "AIRBASE"], "temp": ["AIRBASE", "UBA"], + "press": ["AIRBASE", "UBA"]}) + expected = { + "no2": {'id': 16686, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'no2', + 'parameter_label': 'NO2', 'parameter_attribute': ''}, + "o3": {'id': 16687, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'o3', + 'parameter_label': 'O3', 'parameter_attribute': ''}, + "cloudcover": {'id': 54036, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'cloudcover', + 'parameter_label': 'CLOUDCOVER', 'parameter_attribute': 'REA'}, + "temp": {'id': 88491, 'network_name': 'UBA', 'station_id': 'DENW053', 'parameter_name': 'temp', + 'parameter_label': 'TEMP-REA-MIUB', 'parameter_attribute': 'REA'}, + "press": {'id': 26692, 'network_name': 'AIRBASE', 'station_id': 'DENW053', + 'parameter_name': 'press', 'parameter_label': 'PRESS', 'parameter_attribute': 'REA'}} + assert check_nested_equality(res, expected) is True + class TestSelectDistinctSeries: @@ -274,8 +283,8 @@ class TestSelectDistinctSeries: def test_network_not_available(self, vars): with pytest.raises(ValueError) as e: _select_distinct_series(vars, network_name="AIRBASE") - assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable " \ - "no2 as only following networks are available in JOIN: ['UBA']" + assert e.value.args[-1] == "Cannot find a valid match for requested networks ['AIRBASE'] and variable no2 as " \ + "only following networks are available in JOIN: ['UBA']" def test_different_network_and_origin(self, vars): origin = {"no2": "test", "temp": "", "cloudcover": "REA"} @@ -345,20 +354,3 @@ class TestLowerList: assert list(list_iterator) == ["capitalised", "already_small", "uppercase", "verystrange"] -class TestCreateUrl: - - def test_minimal_args_given(self): - url = create_url("www.base.edu", "testingservice") - assert url == "www.base.edu/testingservice/?" - - def test_given_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="happy", confidence=0.98) - assert url == "www.base2.edu/testingservice/?mood=happy&confidence=0.98" - - def test_single_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="undefined") - assert url == "www.base2.edu/testingservice/?mood=undefined" - - def test_none_kwargs(self): - url = create_url("www.base2.edu/", "testingservice", mood="sad", happiness=None, stress_factor=100) - assert url == "www.base2.edu/testingservice/?mood=sad&stress_factor=100" diff --git a/test/test_helpers/test_data_sources/test_toar_data.py b/test/test_helpers/test_data_sources/test_toar_data.py new file mode 100644 index 0000000000000000000000000000000000000000..277a637b7d0418289d2b1f78e5e2731957041bcd --- /dev/null +++ b/test/test_helpers/test_data_sources/test_toar_data.py @@ -0,0 +1,40 @@ +from mlair.configuration.join_settings import join_settings +from mlair.helpers.data_sources.toar_data import get_data, create_url + + +class TestGetData: + + def test(self): + opts = {"base": join_settings()[0], "service": "series", "station_id": 'DEBW107', "network_name": "UBA", + "parameter_name": "o3,no2"} + assert get_data(opts, headers={}) == [[17057, 'UBA', 'DEBW107', 'O3'], [17058, 'UBA', 'DEBW107', 'NO2']] + + +class TestCreateUrl: + + def test_minimal_args_given(self): + url = create_url("www.base.edu", "testingservice") + assert url == "www.base.edu/testingservice/" + + def test_given_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="happy", confidence=0.98) + assert url == "www.base2.edu/testingservice/?mood=happy&confidence=0.98" + + def test_single_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="undefined") + assert url == "www.base2.edu/testingservice/?mood=undefined" + + def test_none_kwargs(self): + url = create_url("www.base2.edu/", "testingservice", mood="sad", happiness=None, stress_factor=100) + assert url == "www.base2.edu/testingservice/?mood=sad&stress_factor=100" + + def test_param_id(self): + url = create_url("www.base.edu", "testingservice", param_id="2001") + assert url == "www.base.edu/testingservice/2001" + + def test_param_id_kwargs(self): + url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", happiness=None, stress_factor=100) + assert url == "www.base.edu/testingservice/2001?mood=sad&stress_factor=100" + + url = create_url("www.base.edu", "testingservice", param_id=2001, mood="sad", series_id=222) + assert url == "www.base.edu/testingservice/2001?mood=sad&series_id=222" diff --git a/test/test_run_modules/test_pre_processing.py b/test/test_run_modules/test_pre_processing.py index 1dafdbd5c4882932e3d57e726e7a06bea22a745d..4618a5e4f3f5eaf2a419e68a5a0e18156aa7fb0d 100644 --- a/test/test_run_modules/test_pre_processing.py +++ b/test/test_run_modules/test_pre_processing.py @@ -30,7 +30,7 @@ class TestPreProcessing: @pytest.fixture def obj_with_exp_setup(self): - ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001'], + ExperimentSetup(stations=['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW99X'], statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}, station_type="background", data_handler=DefaultDataHandler) pre = object.__new__(PreProcessing) @@ -87,7 +87,7 @@ class TestPreProcessing: def test_create_set_split_all_stations(self, caplog, obj_with_exp_setup): caplog.set_level(logging.DEBUG) obj_with_exp_setup.create_set_split(slice(0, 2), "awesome") - message = "Awesome stations (len=6): ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW001']" + message = "Awesome stations (len=6): ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087', 'DEBW99X']" assert ('root', 10, message) in caplog.record_tuples data_store = obj_with_exp_setup.data_store assert isinstance(data_store.get("data_collection", "general.awesome"), DataCollection)