diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index ebfac3cd0d989a8845f2a3fceba33d562b898b8d..6102da7bcc5f61ee1c2ea0d205125a6e5ee641ea 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index ebfac3cd0d989a8845f2a3fceba33d562b898b8d..6102da7bcc5f61ee1c2ea0d205125a6e5ee641ea 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -9,6 +9,7 @@ pytest-lazy-fixture==0.6.3 pytest-metadata==1.11.0 pytest-sugar==0.9.4 tabulate==0.8.8 +timezonefinder==5.2.0 wget==3.2 --no-binary shapely Shapely==1.7.0 diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index b630261dbf58d7402f8c3cacaee153347ad4f1e3..f3e0496120fa15e958e3d62e68f7d0915bbcf938 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -9,7 +9,6 @@ DEFAULT_STATIONS = ['DEBW107', 'DEBY081', 'DEBW013', 'DEBW076', 'DEBW087'] DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} -DEFAULT_NETWORK = "AIRBASE" DEFAULT_STATION_TYPE = "background" DEFAULT_VARIABLES = DEFAULT_VAR_ALL_DICT.keys() DEFAULT_START = "1997-01-01" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index c26f97cdd7ae43a7f6026801aef39435d517c428..140bc8ccf6db82326abac6b323e0b6d979e29ada 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -63,8 +63,8 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.station_type, self.store_data_locally, self.data_origin, self.start, + self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) @@ -147,8 +147,7 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.station_type, self.network, self.store_data_locally, self.data_origin, - start, end) + self.station_type, self.store_data_locally, self.data_origin, start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) return data diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index 516fab7d042ea8d10647ac22de8ae6cc7b220361..e1bcd62be21f9e0e6e04293fe03d2b5c7e70cb63 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -40,7 +40,6 @@ class DataHandlerSingleStation(AbstractDataHandler): is 0. """ DEFAULT_STATION_TYPE = "background" - DEFAULT_NETWORK = "AIRBASE" DEFAULT_VAR_ALL_DICT = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 'pblheight': 'maximum'} @@ -59,16 +58,16 @@ class DataHandlerSingleStation(AbstractDataHandler): chem_vars = ["benzene", "ch4", "co", "ethane", "no", "no2", "nox", "o3", "ox", "pm1", "pm10", "pm2p5", "propane", "so2", "toluene"] - _hash = ["station", "statistics_per_var", "data_origin", "station_type", "network", "sampling", "target_dim", - "target_var", "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", - "window_lead_time", "interpolation_limit", "interpolation_method", "variables", "window_history_end"] + _hash = ["station", "statistics_per_var", "data_origin", "station_type", "sampling", "target_dim", "target_var", + "time_dim", "iter_dim", "window_dim", "window_history_size", "window_history_offset", "window_lead_time", + "interpolation_limit", "interpolation_method", "variables", "window_history_end"] def __init__(self, station, data_path, statistics_per_var=None, station_type=DEFAULT_STATION_TYPE, - network=DEFAULT_NETWORK, sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, - target_dim=DEFAULT_TARGET_DIM, target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, - iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, - window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, - window_history_end=DEFAULT_WINDOW_HISTORY_END, window_lead_time=DEFAULT_WINDOW_LEAD_TIME, + sampling: Union[str, Tuple[str]] = DEFAULT_SAMPLING, target_dim=DEFAULT_TARGET_DIM, + target_var=DEFAULT_TARGET_VAR, time_dim=DEFAULT_TIME_DIM, iter_dim=DEFAULT_ITER_DIM, + window_dim=DEFAULT_WINDOW_DIM, window_history_size=DEFAULT_WINDOW_HISTORY_SIZE, + window_history_offset=DEFAULT_WINDOW_HISTORY_OFFSET, window_history_end=DEFAULT_WINDOW_HISTORY_END, + window_lead_time=DEFAULT_WINDOW_LEAD_TIME, interpolation_limit: Union[int, Tuple[int]] = DEFAULT_INTERPOLATION_LIMIT, interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, @@ -89,7 +88,6 @@ class DataHandlerSingleStation(AbstractDataHandler): self._transformation = self.setup_transformation(transformation) self.station_type = station_type - self.network = network self.sampling = sampling self.target_dim = target_dim self.target_var = target_var @@ -141,9 +139,8 @@ class DataHandlerSingleStation(AbstractDataHandler): return self._data.shape, self.get_X().shape, self.get_Y().shape def __repr__(self): - return f"StationPrep(station={self.station}, data_path='{self.path}', " \ - f"statistics_per_var={self.statistics_per_var}, " \ - f"station_type='{self.station_type}', network='{self.network}', " \ + return f"StationPrep(station={self.station}, data_path='{self.path}', data_origin={self.data_origin}, " \ + f"statistics_per_var={self.statistics_per_var}, station_type='{self.station_type}', " \ f"sampling='{self.sampling}', target_dim='{self.target_dim}', target_var='{self.target_var}', " \ f"time_dim='{self.time_dim}', window_history_size={self.window_history_size}, " \ f"window_lead_time={self.window_lead_time}, interpolation_limit={self.interpolation_limit}, " \ @@ -170,8 +167,12 @@ class DataHandlerSingleStation(AbstractDataHandler): return self.get_transposed_label() def get_coordinates(self): - coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) - return coords.rename(index={"station_lon": "lon", "station_lat": "lat"}).to_dict()[str(self)] + try: + coords = self.meta.loc[["station_lon", "station_lat"]].astype(float) + coords = coords.rename(index={"station_lon": "lon", "station_lat": "lat"}) + except KeyError: + coords = self.meta.loc[["lon", "lat"]].astype(float) + return coords.to_dict()[str(self)] def call_transform(self, inverse=False): opts_input = self._transformation[0] @@ -302,7 +303,7 @@ class DataHandlerSingleStation(AbstractDataHandler): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, + self.station_type, self.store_data_locally, self.data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit, sampling=self.sampling) @@ -321,8 +322,8 @@ class DataHandlerSingleStation(AbstractDataHandler): self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) - def load_data(self, path, station, statistics_per_var, sampling, station_type=None, network=None, - store_data_locally=False, data_origin: Dict = None, start=None, end=None): + def load_data(self, path, station, statistics_per_var, sampling, station_type=None, store_data_locally=False, + data_origin: Dict = None, start=None, end=None): """ Load data and meta data either from local disk (preferred) or download new data by using a custom download method. @@ -340,9 +341,8 @@ class DataHandlerSingleStation(AbstractDataHandler): if os.path.exists(meta_file): os.remove(meta_file) data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, + station_type=station_type, store_data_locally=store_data_locally, + data_origin=data_origin, time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug(f"loaded new data") else: @@ -350,26 +350,24 @@ class DataHandlerSingleStation(AbstractDataHandler): logging.debug(f"try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) - self.check_station_meta(meta, station, station_type, network, data_origin) + self.check_station_meta(meta, station, station_type, data_origin) logging.debug("loading finished") except FileNotFoundError as e: logging.debug(e) logging.debug(f"load new data") data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - station_type=station_type, network=network, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) + station_type=station_type, store_data_locally=store_data_locally, + data_origin=data_origin, time_dim=self.time_dim, + target_dim=self.target_dim, iter_dim=self.iter_dim) logging.debug("loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) return data, meta - def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, - station_type=None, network=None, store_data_locally=True, data_origin: Dict = None, - time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \ - -> [xr.DataArray, pd.DataFrame]: + def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, station_type=None, + store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, + target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: """ Download data from TOAR database using the JOIN interface or load local era5 data. @@ -382,8 +380,8 @@ class DataHandlerSingleStation(AbstractDataHandler): :return: downloaded data and its meta data """ df_all = {} - df_era5, df_toar, df_join = None, None, None - meta_era5, meta_toar, meta_join = None, None, None + df_era5, df_toar = None, None + meta_era5, meta_toar = None, None if data_origin is not None: era5_origin = filter_dict_by_value(data_origin, "era5", True) era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) @@ -401,23 +399,16 @@ class DataHandlerSingleStation(AbstractDataHandler): df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling, data_origin=era5_origin) if toar_origin is None or len(toar_stats) > 0: - # load combined ata from toar-data (v2 & v1) + # load combined data from toar-data (v2 & v1) df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, sampling=sampling, data_origin=toar_origin, station_type=station_type) - # # load data from toar-data (v2) - # df_toar, meta_toar = toar_data.download_toar(station, toar_stats, sampling=sampling, data_origin=toar_origin) - # - # # load join data (toar-data v1) - # df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - # station_type=station_type, data_origin=toar_origin) - # - # # fill-up toar-data with join data - # a = 1 - df = pd.concat([df_era5, df_toar], axis=1, sort=True) - meta = meta_era5 if meta_era5 is not None else meta_toar + if meta_era5 is not None and meta_toar is not None: + meta = meta_era5.combine_first(meta_toar) + else: + meta = meta_era5 if meta_era5 is not None else meta_toar meta.loc["data_origin"] = str(data_origin) df_all[station[0]] = df @@ -431,16 +422,16 @@ class DataHandlerSingleStation(AbstractDataHandler): return xarr, meta @staticmethod - def check_station_meta(meta, station, station_type, network, data_origin): + def check_station_meta(meta, station, station_type, data_origin): """ Search for the entries in meta data and compare the value with the requested values. Will raise a FileNotFoundError if the values mismatch. """ if station_type is not None: - check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)} + check_dict = {"station_type": station_type, "type": station_type, "data_origin": str(data_origin)} for (k, v) in check_dict.items(): - if v is None or k not in meta: + if v is None or k not in meta.index: continue if meta.at[k, station[0]] != v: logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index 47ccc5510c8135745c518611504cd02900a1f883..6fbdc38fa017aac3331d897355a35277ceed6fdf 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -68,8 +68,8 @@ class DataHandlerFilterSingleStation(DataHandlerSingleStation): def make_input_target(self): data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.station_type, self.network, self.store_data_locally, self.data_origin, - self.start, self.end) + self.station_type, self.store_data_locally, self.data_origin, self.start, + self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit) self.set_inputs_and_targets() diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 8ba78f1971d677b64c4293e0d6e5ac35410021b5..ae46ad918630f2a5f083c62b558609e85d7cc2d8 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -168,7 +168,7 @@ class DefaultDataHandler(AbstractDataHandler): dim = self.time_dim intersect = reduce(np.intersect1d, map(lambda x: x.coords[dim].values, X_original)) if len(intersect) < max(self.min_length, 1): - X, Y = None, None + raise ValueError(f"There is no intersection of X.") else: X = list(map(lambda x: x.sel({dim: intersect}), X_original)) Y = Y_original.sel({dim: intersect}) @@ -205,10 +205,6 @@ class DefaultDataHandler(AbstractDataHandler): if True only extract values larger than extreme_values :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime """ - # check if X or Y is None - if (self._X is None) or (self._Y is None): - logging.debug(f"{str(self.id_class)} has no data for X or Y, skip multiply extremes") - return if extreme_values is None: logging.debug(f"No extreme values given, skip multiply extremes") self._X_extreme, self._Y_extreme = self._X, self._Y diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index a4f60afc6e409e7404ef9cd57b27debfa2726875..cd569f3a77871a2566c8962680122b8fc7faed42 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -11,7 +11,7 @@ import xarray as xr from mlair import helpers from mlair.configuration.era5_settings import era5_settings from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings -from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone from mlair.helpers.data_sources.toar_data import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint @@ -56,6 +56,9 @@ def load_era5(station_name, stat_var, sampling, data_origin): else: station_data = station_data[stat_var] + # convert to local timezone + station_data = correct_timezone(station_data, station_meta, sampling) + variable_meta = _emulate_meta_data(station_data) meta = combine_meta_data(station_meta, variable_meta) meta = pd.DataFrame.from_dict(meta, orient='index') diff --git a/mlair/helpers/data_sources/join.py b/mlair/helpers/data_sources/join.py index 0ae1af1c81fe65e3fa168d3ea012607aa37c4e17..df3b358ed187d4c6460f3d40b7ec20599526b86d 100644 --- a/mlair/helpers/data_sources/join.py +++ b/mlair/helpers/data_sources/join.py @@ -10,7 +10,7 @@ import pandas as pd from mlair import helpers from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources import toar_data +from mlair.helpers.data_sources import toar_data, toar_data_v2 # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' @@ -59,6 +59,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # download all variables with given statistic data = None df = None + meta = {} logging.info(f"load data for {station_name[0]} from JOIN") for var in _lower_list(sorted(vars_dict.keys())): if var in stat_var.keys(): @@ -83,17 +84,53 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t # store data in pandas dataframe df = _save_to_pandas(df, data, stat, var) + meta[var] = _correct_meta(data["metadata"]) logging.debug('finished: {}'.format(var)) if data: - meta = pd.DataFrame.from_dict(data['metadata'], orient='index') + # load station meta using toar-data v2 API and convert to local timezone + meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta") + station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers) + df = toar_data_v2.correct_timezone(df, station_meta, sampling) + + # create meta data + meta = toar_data_v2.combine_meta_data(station_meta, meta) + meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name return df, meta else: raise toar_data.EmptyQueryResult("No data found in JOIN.") +def _correct_meta(meta): + meta_out = {} + for k, v in meta.items(): + if k.startswith("station"): + _k = k.split("_", 1)[1] + _d = meta_out.get("station", {}) + _d[_k] = v + meta_out["station"] = _d + elif k.startswith("parameter"): + _k = k.split("_", 1)[1] + _d = meta_out.get("variable", {}) + _d[_k] = v + meta_out["variable"] = _d + elif k == "network_name": + if v == "AIRBASE": + _d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"} + elif v == "UBA": + _d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"} + else: + _d = {"name": v} + meta_out["roles"] = [{"contact": {"organisation": _d}}] + elif k in ["google_resolution", "numid"]: + continue + else: + meta_out[k] = v + return meta_out + + def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: """ Split given dict into network and data origin. @@ -157,24 +194,6 @@ def correct_data_format(data): return formatted -def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none, - join_url_base: str, headers: Dict) -> [Dict, Dict]: - opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, - "network_name": network_name, "as_dict": "true", - "columns": "station_id,network_name,station_local_id,station_type,station_type_of_area,station_category," - "station_name,station_country,station_state,station_lon,station_lat,station_alt," - "station_timezone,station_nightlight_5km,station_climatic_zone,station_wheat_production," - "station_rice_production,station_nox_emissions,station_omi_no2_column,station_toar_category," - "station_htap_region,station_reported_alt,station_alt_flag,station_coordinate_status," - "station_google_alt,station_etopo_alt,station_etopo_min_alt_5km,station_etopo_relative_alt," - "station_dominant_landcover,station_landcover_description,station_max_nightlight_25km," - "station_max_population_density_25km,station_nightlight_1km,station_population_density," - "google_resolution,station_comments,station_max_population_density_5km"} - if network_name is None: - opts["columns"] = opts["columns"].replace(",network_name", "") - return toar_data.get_data(opts, headers)[-1] - - def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]: """ diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index 70d62238c98b8469c5a4324ac33ab67951210610..b01fb3fd10be08dab8436ca39aeb1df4148bc5dc 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -9,6 +9,7 @@ from . import join, toar_data_v2 import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry +import pandas as pd class EmptyQueryResult(Exception): @@ -29,9 +30,11 @@ def create_url(base: str, service: str, param_id: Union[str, int, None] = None, :return: combined url as string """ - if not base.endswith("/"): - base += "/" - url = f"{base}{service}" + url = f"{base}" + if not url.endswith("/"): + url += "/" + if service is not None: + url = f"{url}{service}" if not url.endswith("/"): url += "/" if param_id is not None: @@ -79,11 +82,19 @@ def download_toar(station, toar_stats, sampling, data_origin, station_type=None df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) # load join data (toar-data v1) - df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, - station_type=station_type, data_origin=data_origin) - - return df_toar - - -def merge_toar_join(df_toar, df_join): - start_date = min([df_toar.index.min(), df_join.index.min()]) \ No newline at end of file + df_join, _ = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, + station_type=station_type, data_origin=data_origin) + + # merge both data sources with priority on toar-data v2 + df_merged = merge_toar_join(df_toar, df_join, sampling) + return df_merged, meta_toar + + +def merge_toar_join(df_toar, df_join, sampling): + start_date = min([df_toar.index.min(), df_join.index.min()]) + end_date = max([df_toar.index.max(), df_join.index.max()]) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = df_toar.reindex(full_time) + full_data.update(df_join, overwrite=False) + return full_data diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index bf85dd9e93c5b0fc3fb111090ff295bafa567904..5f8e831f3f77d9f3b4e933eae6ff011a58c97711 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -8,6 +8,8 @@ from typing import Union, List, Dict from io import StringIO import pandas as pd +import pytz +from timezonefinder import TimezoneFinder from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list @@ -65,10 +67,13 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, data_dict = {} for var, meta in timeseries_meta.items(): logging.debug(f"load {var}") - meta, opts = prepare_meta(meta, sampling, stat_var, var) - data_var = load_timeseries_data([meta[0]], data_url_base, opts, headers)[0] - data_dict[var] = data_var + meta_and_opts = prepare_meta(meta, sampling, stat_var, var) + data_var = [] + for var_meta, opts in meta_and_opts: + data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) + data_dict[var] = merge_data(*data_var, sampling=sampling) data = pd.DataFrame.from_dict(data_dict) + data = correct_timezone(data, station_meta, sampling) meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) meta = pd.DataFrame.from_dict(meta, orient='index') @@ -76,15 +81,52 @@ def download_toar(station_name: Union[str, List[str]], stat_var: dict, return data, meta +def merge_data(*args, sampling="hourly"): + start_date = min(map(lambda x: x.index.min(), args)) + end_date = max(map(lambda x: x.index.max(), args)) + freq = {"hourly": "1H", "daily": "1d"}.get(sampling) + full_time = pd.date_range(start_date, end_date, freq=freq) + full_data = args[0].reindex(full_time) + if not isinstance(full_data, pd.DataFrame): + full_data = full_data.to_frame() + for d in args[1:]: + full_data.update(d, overwrite=False) + return full_data.squeeze() + + +def correct_timezone(data, meta, sampling): + """ + Extract timezone information and convert data index to this timezone. + + Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all + other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. + Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. + """ + if sampling == "hourly": + tz_info = meta.get("timezone", "UTC") + try: + tz = pytz.timezone(tz_info) + except pytz.exceptions.UnknownTimeZoneError as e: + lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] + tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) + index = data.index + index = index.tz_localize(None) + utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) + data.index = index + utc_offset + return data + + def prepare_meta(meta, sampling, stat_var, var): - meta = meta[0] - opts = {} - if sampling == "daily": - opts["timeseries_id"] = meta.pop("id") - meta["id"] = None - opts["names"] = stat_var[var] - opts["sampling"] = sampling - return [meta], opts + out = [] + for m in meta: + opts = {} + if sampling == "daily": + opts["timeseries_id"] = m.pop("id") + m["id"] = None + opts["names"] = stat_var[var] + opts["sampling"] = sampling + out.append(([m], opts)) + return out def combine_meta_data(station_meta, timeseries_meta): @@ -120,16 +162,18 @@ def combine_meta_data(station_meta, timeseries_meta): return meta -def load_timeseries_data(timeseries_meta, url_base, opts, headers): +def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): coll = [] for meta in timeseries_meta: series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} + if sampling != "hourly": + opts["service"] = None res = get_data(opts, headers, as_json=False) data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, infer_datetime_format=True) - data = data["value"].rename(meta["variable"]["name"]) + data = data[opts.get("names", "value")].rename(meta["variable"]["name"]) coll.append(data) return coll diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index d807db14c96a4a30fde791e54c8b1b32e519fb9c..5e7efde64dda069c57e2b7bb63faa8064f65a57d 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -10,7 +10,7 @@ from dill.source import getsource from mlair.configuration import path_config from mlair import helpers -from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_NETWORK, DEFAULT_STATION_TYPE, \ +from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_STATION_TYPE, \ DEFAULT_START, DEFAULT_END, DEFAULT_WINDOW_HISTORY_SIZE, DEFAULT_OVERWRITE_LOCAL_DATA, \ DEFAULT_HPC_LOGIN_LIST, DEFAULT_HPC_HOST_LIST, DEFAULT_CREATE_NEW_MODEL, DEFAULT_TRAIN_MODEL, \ DEFAULT_FRACTION_OF_TRAINING, DEFAULT_EXTREME_VALUES, DEFAULT_EXTREMES_ON_RIGHT_TAIL_ONLY, DEFAULT_PERMUTE_DATA, \ diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 8c5080f2cc64fe0f1391457d0907f5ba109a92df..7dc61f89967b5992434e948fe2ba1fc37a4b651b 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -13,6 +13,7 @@ from typing import Dict, Tuple, Union, List, Callable import numpy as np import pandas as pd import xarray as xr +import datetime as dt from mlair.configuration import path_config from mlair.data_handler import Bootstraps, KerasIterator diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index de7000244e879aacdd20b8e0a70ef68e847e0717..7fb272f642aa0d7033cc1f28d4779dd6ad76e66e 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -114,8 +114,8 @@ class PreProcessing(RunEnvironment): +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ """ - meta_cols = ['station_name', 'station_lon', 'station_lat', 'station_alt'] - meta_round = ["station_lon", "station_lat", "station_alt"] + meta_cols = ["name", "lat", "lon", "alt", "country", "state", "type", "type_of_area", "toar1_category"] + meta_round = ["lat", "lon", "alt"] precision = 4 path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(path) @@ -402,8 +402,9 @@ def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_pat def f_proc_create_info_df(data, meta_cols): station_name = str(data.id_class) + meta = data.id_class.meta res = {"station_name": station_name, "Y_shape": data.get_Y()[0].shape[0], - "meta": data.id_class.meta.loc[meta_cols].values.flatten()} + "meta": meta.reindex(meta_cols).values.flatten()} return res diff --git a/requirements.txt b/requirements.txt index 3afc17b67fddbf5a269df1e1b7e103045630a290..353c0092237ef30ef21c3c225e899b8e21a823f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ six==1.15.0 statsmodels==0.12.2 tabulate==0.8.9 tensorflow==2.5.0 +timezonefinder==5.2.0 toolz==0.11.2 typing_extensions==3.7.4.3 wget==3.2