diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 37bc17a9bce9cadfda732120f87df3d39a50d921..4eb5e3a46dbcd8c32d9a2962227b70ce2900554c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -46,10 +46,7 @@ tests (from scratch): - pip install --upgrade pip - zypper --no-gpg-checks addrepo https://download.opensuse.org/repositories/Application:Geo/15.4/Application:Geo.repo - zypper --no-gpg-checks refresh - - zypper --no-gpg-checks --non-interactive install proj=9.1.0 - - zypper --no-gpg-checks --non-interactive install geos=3.11.1 - - zypper --no-gpg-checks --non-interactive install geos-devel=3.9.1 - # - zypper --no-gpg-checks --non-interactive install libproj22=8.2.1 + - zypper --no-gpg-checks --non-interactive install proj geos geos-devel - zypper --no-gpg-checks --non-interactive install binutils libproj-devel gdal-devel - pip install -r requirements.txt - chmod +x ./CI/run_pytest.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index d489da6b31362241a71779435bb6b57e535261bd..c91817975db1f9420c761c6c5c482b8ebeb321df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,25 @@ # Changelog All notable changes to this project will be documented in this file. +## v2.4.0 - 2023-06-30- IFS data and bias-corrected evaluation + +### general: +* support IFS data (local) and ERA5 data (from toar db) +* bias free evaluation +### new features: +* can load local IFS forecast data as input (#450) +* can also load ERA5 data from toar db as alternative to locally stored data (#449) +* new plot to show monthly data distributions in subsets (#445) +* can load a DL model from external path (#448) +* introduced option to bias-correct model's and competitors' forecasts (#442) +* can use different interpolation methods when having CAMS as competitor (#444) +### technical: +* change toar statistics from api v1 to api v2 (#454) +* now able to set configuration paths for local era5 and ifs data as experiment parameter (#457) +* improved retry strategy when downloading data from toar db (#453) +* updated packages (#452) +* calculation of filter apriori is more robust now, properties are stored inside experiment folder (#447, #451) + ## v2.3.0 - 2022-11-25 - new models and plots ### general: diff --git a/README.md b/README.md index 212aac2ac068e88f957b8f8cfb756c8ce9e476f9..c83069d9e1552804be6fe58d0a292008b3e03cbf 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ HPC systems, see [here](#special-instructions-for-installation-on-jülich-hpc-sy * Installation of **MLAir**: * Either clone MLAir from the [gitlab repository](https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair.git) and use it without installation (beside the requirements) - * or download the distribution file ([current version](https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair/-/blob/master/dist/mlair-2.2.0-py3-none-any.whl)) + * or download the distribution file ([current version](https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair/-/blob/master/dist/mlair-2.4.0-py3-none-any.whl)) and install it via `pip install <dist_file>.whl`. In this case, you can simply import MLAir in any python script inside your virtual environment using `import mlair`. diff --git a/dist/mlair-2.4.0-py3-none-any.whl b/dist/mlair-2.4.0-py3-none-any.whl new file mode 100644 index 0000000000000000000000000000000000000000..63d4bb6b219840a218a50ab9da0b3c48a17ef18e Binary files /dev/null and b/dist/mlair-2.4.0-py3-none-any.whl differ diff --git a/docs/_source/installation.rst b/docs/_source/installation.rst index 182489dbd5fd60c38808eebf66ac32bd661ec6ca..030b8f6b05f2141c5d6403b0be13e9b2e7438b66 100644 --- a/docs/_source/installation.rst +++ b/docs/_source/installation.rst @@ -27,7 +27,7 @@ Installation of MLAir * Install all requirements from `requirements.txt <https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair/-/blob/master/requirements.txt>`_ preferably in a virtual environment * Either clone MLAir from the `gitlab repository <https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair.git>`_ -* or download the distribution file (`current version <https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair/-/blob/master/dist/mlair-2.2.0-py3-none-any.whl>`_) +* or download the distribution file (`current version <https://gitlab.jsc.fz-juelich.de/esde/machine-learning/mlair/-/blob/master/dist/mlair-2.40-py3-none-any.whl>`_) and install it via :py:`pip install <dist_file>.whl`. In this case, you can simply import MLAir in any python script inside your virtual environment using :py:`import mlair`. diff --git a/mlair/__init__.py b/mlair/__init__.py index e55918caf0a84391ba912d31fc18c2e166c1317b..d76d1b3dbf250e77ac4eb6ecb8f9e30990cff372 100644 --- a/mlair/__init__.py +++ b/mlair/__init__.py @@ -1,6 +1,6 @@ __version_info__ = { 'major': 2, - 'minor': 3, + 'minor': 4, 'micro': 0, } diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index d0b3592905c32a4c7af875014532a5539805dd3a..75a16c374d030c4c00e946c299957e1304f83977 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -53,6 +53,7 @@ DEFAULT_DO_UNCERTAINTY_ESTIMATE = True DEFAULT_UNCERTAINTY_ESTIMATE_BLOCK_LENGTH = "1m" DEFAULT_UNCERTAINTY_ESTIMATE_EVALUATE_COMPETITORS = True DEFAULT_UNCERTAINTY_ESTIMATE_N_BOOTS = 1000 +DEFAULT_DO_BIAS_FREE_EVALUATION = True DEFAULT_EVALUATE_FEATURE_IMPORTANCE = True DEFAULT_FEATURE_IMPORTANCE_CREATE_NEW_BOOTSTRAPS = False DEFAULT_FEATURE_IMPORTANCE_N_BOOTS = 20 @@ -61,7 +62,8 @@ DEFAULT_FEATURE_IMPORTANCE_BOOTSTRAP_METHOD = "shuffle" DEFAULT_PLOT_LIST = ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotFeatureImportanceSkillScore", "PlotConditionalQuantiles", "PlotAvailability", "PlotAvailabilityHistogram", "PlotDataHistogram", "PlotPeriodogram", - "PlotSampleUncertaintyFromBootstrap"] + "PlotSampleUncertaintyFromBootstrap", "PlotErrorMetrics", "PlotDataMonthlyDistribution", + "PlotTimeEvolutionMetric", "PlotSeasonalMSEStack", "PlotErrorsOnMap"] DEFAULT_SAMPLING = "daily" DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "", diff --git a/mlair/configuration/era5_settings.py b/mlair/configuration/era5_settings.py index 9f44176bd50bf95226a0ea7a4913152a34619f9a..7b09bd83ec7a92afc95e7966b1157e78bd897cfe 100644 --- a/mlair/configuration/era5_settings.py +++ b/mlair/configuration/era5_settings.py @@ -3,7 +3,7 @@ from typing import Tuple -def era5_settings(sampling="daily") -> Tuple[str, str]: +def era5_settings(sampling="daily", era5_data_path=None, era5_file_names=None) -> Tuple[str, str]: """ Check for sampling as only hourly resolution is supported by era5 and return path on HPC systems. @@ -12,8 +12,8 @@ def era5_settings(sampling="daily") -> Tuple[str, str]: :return: HPC path """ if sampling == "hourly": # pragma: no branch - ERA5_DATA_PATH = "." - FILE_NAMES = "*.nc" + ERA5_DATA_PATH = era5_data_path or "." + FILE_NAMES = era5_file_names or "*.nc" else: raise NameError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") return ERA5_DATA_PATH, FILE_NAMES diff --git a/mlair/configuration/ifs_settings.py b/mlair/configuration/ifs_settings.py new file mode 100644 index 0000000000000000000000000000000000000000..dd40d72ee0f479787bdfbb2a0a8849dc28d0d183 --- /dev/null +++ b/mlair/configuration/ifs_settings.py @@ -0,0 +1,19 @@ +"""Settings to access not public era5 data.""" + +from typing import Tuple + + +def ifs_settings(sampling="daily", ifs_data_path=None, ifs_file_names=None) -> Tuple[str, str]: + """ + Check for sampling as only hourly resolution is supported by ifs and return path on HPC systems. + + :param sampling: temporal resolution to load data for, only hourly supported (default "daily") + + :return: HPC path + """ + if sampling == "hourly": # pragma: no branch + IFS_DATA_PATH = ifs_data_path or "." + FILE_NAMES = ifs_file_names or "*.nc" + else: + raise NameError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") + return IFS_DATA_PATH, FILE_NAMES diff --git a/mlair/configuration/toar_data_v2_settings.py b/mlair/configuration/toar_data_v2_settings.py index a8bb9f42cf5a1967f150aa18019c2dbdc89f43a2..da17b90d32088431566f93ae951545ff5a168079 100644 --- a/mlair/configuration/toar_data_v2_settings.py +++ b/mlair/configuration/toar_data_v2_settings.py @@ -10,7 +10,7 @@ def toar_data_v2_settings(sampling="daily") -> Tuple[str, Dict]: :return: Service url and optional headers """ if sampling == "daily": # pragma: no branch - TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/statistics/api/v1/" + TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/analysis/statistics/" headers = {} elif sampling == "hourly" or sampling == "meta": TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2/" diff --git a/mlair/data_handler/data_handler_mixed_sampling.py b/mlair/data_handler/data_handler_mixed_sampling.py index eaa6a21175bd5f88c32c9c3cb74947c0cc0956a3..addf864c3ca2ab88c565eb09e517a93a7960613c 100644 --- a/mlair/data_handler/data_handler_mixed_sampling.py +++ b/mlair/data_handler/data_handler_mixed_sampling.py @@ -62,8 +62,9 @@ class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) + data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.store_data_locally, self.data_origin, self.start, self.end) + self.store_data_locally, data_origin, self.start, self.end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) @@ -144,9 +145,10 @@ class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSi start, end = self.update_start_end(ind) vars = [self.variables, self.target_var] stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) + data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], - self.store_data_locally, self.data_origin, start, end) + self.store_data_locally, data_origin, start, end) data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) return data @@ -466,3 +468,41 @@ class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWi else: # if no unfiltered meteo branch transformation_res["filtered_meteo"] = transformation_meteo return transformation_res if len(transformation_res) > 0 else None + + +class DataHandlerIFSSingleStation(DataHandlerMixedSamplingWithClimateFirFilterSingleStation): + + def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: + start, end = self.update_start_end(ind) + vars = [self.variables, self.target_var] + stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) + data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) + + data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], + self.store_data_locally, data_origin, start, end) + if ind == 1: # only for target + data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], + limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) + return data + + def make_input_target(self): + """ + A FIR filter is applied on the input data that has hourly resolution. Labels Y are provided as aggregated values + with daily resolution. + """ + self._data = tuple(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data + self.set_inputs_and_targets() + self.apply_filter() + + +class DataHandlerIFS(DataHandlerMixedSamplingWithClimateAndFirFilter): + + data_handler_fir = (DataHandlerMixedSamplingWithFirFilterSingleStation, + DataHandlerIFSSingleStation) + + @classmethod + def set_data_handler_fir_pos(cls, **kwargs): + """ + Set position of fir data handler to always use climate FIR. + """ + cls.data_handler_fir_pos = 1 # use slower fir version with climate estimate diff --git a/mlair/data_handler/data_handler_single_station.py b/mlair/data_handler/data_handler_single_station.py index ec0f1f73282979a1e69945e1ad7f6817bdf3ba12..54093ab4579d4b4a141e651213a64d093ac6f16d 100644 --- a/mlair/data_handler/data_handler_single_station.py +++ b/mlair/data_handler/data_handler_single_station.py @@ -11,6 +11,7 @@ import dill import hashlib import logging import os +import ast from functools import reduce, partial from typing import Union, List, Iterable, Tuple, Dict, Optional @@ -22,7 +23,7 @@ from mlair.configuration import check_path_and_create from mlair import helpers from mlair.helpers import statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict from mlair.data_handler.abstract_data_handler import AbstractDataHandler -from mlair.helpers import data_sources +from mlair.helpers import data_sources, check_nested_equality # define a more general date type for type hinting date = Union[dt.date, dt.datetime] @@ -70,7 +71,8 @@ class DataHandlerSingleStation(AbstractDataHandler): interpolation_method: Union[str, Tuple[str]] = DEFAULT_INTERPOLATION_METHOD, overwrite_local_data: bool = False, transformation=None, store_data_locally: bool = True, min_length: int = 0, start=None, end=None, variables=None, data_origin: Dict = None, - lazy_preprocessing: bool = False, overwrite_lazy_data=False, **kwargs): + lazy_preprocessing: bool = False, overwrite_lazy_data=False, era5_data_path=None, era5_file_names=None, + ifs_data_path=None, ifs_file_names=None, **kwargs): super().__init__() self.station = helpers.to_list(station) self.path = self.setup_data_path(data_path, sampling) @@ -114,6 +116,11 @@ class DataHandlerSingleStation(AbstractDataHandler): self.label = None self.observation = None + self._era5_data_path = era5_data_path + self._era5_file_names = era5_file_names + self._ifs_data_path = ifs_data_path + self._ifs_file_names = ifs_file_names + # create samples self.setup_samples() self.clean_up() @@ -299,8 +306,11 @@ class DataHandlerSingleStation(AbstractDataHandler): self._data, self.input_data, self.target_data = list(map(f_prep, [_data, _input_data, _target_data])) def make_input_target(self): - data, self.meta = self.load_data(self.path, self.station, self.statistics_per_var, self.sampling, - self.store_data_locally, self.data_origin, self.start, self.end) + vars = [*self.variables, self.target_var] + stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars) + data_origin = helpers.select_from_dict(self.data_origin, vars) + data, self.meta = self.load_data(self.path, self.station, stats_per_var, self.sampling, + self.store_data_locally, data_origin, self.start, self.end) self._data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method, limit=self.interpolation_limit, sampling=self.sampling) self.set_inputs_and_targets() @@ -313,7 +323,7 @@ class DataHandlerSingleStation(AbstractDataHandler): self.target_data = targets def make_samples(self): - self.make_history_window(self.target_dim, self.window_history_size, self.time_dim) #todo stopped here + self.make_history_window(self.target_dim, self.window_history_size, self.time_dim) self.make_labels(self.target_dim, self.target_var, self.time_dim, self.window_lead_time) self.make_observation(self.target_dim, self.target_var, self.time_dim) self.remove_nan(self.time_dim) @@ -331,94 +341,43 @@ class DataHandlerSingleStation(AbstractDataHandler): file_name = self._set_file_name(path, station, statistics_per_var) meta_file = self._set_meta_file_name(path, station, statistics_per_var) if self.overwrite_local_data is True: - logging.debug(f"overwrite_local_data is true, therefore reload {file_name}") + logging.debug(f"{self.station[0]}: overwrite_local_data is true, therefore reload {file_name}") if os.path.exists(file_name): os.remove(file_name) if os.path.exists(meta_file): os.remove(meta_file) - data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, iter_dim=self.iter_dim) - logging.debug(f"loaded new data") + data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, + store_data_locally=store_data_locally, data_origin=data_origin, + time_dim=self.time_dim, target_dim=self.target_dim, + iter_dim=self.iter_dim, window_dim=self.window_dim, + era5_data_path=self._era5_data_path, + era5_file_names=self._era5_file_names, + ifs_data_path=self._ifs_data_path, + ifs_file_names=self._ifs_file_names) + logging.debug(f"{self.station[0]}: loaded new data") else: try: - logging.debug(f"try to load local data from: {file_name}") + logging.debug(f"{self.station[0]}: try to load local data from: {file_name}") data = xr.open_dataarray(file_name) meta = pd.read_csv(meta_file, index_col=0) self.check_station_meta(meta, station, data_origin, statistics_per_var) - logging.debug("loading finished") + logging.debug(f"{self.station[0]}: loading finished") except FileNotFoundError as e: - logging.debug(e) - logging.debug(f"load new data") - data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling, - store_data_locally=store_data_locally, data_origin=data_origin, - time_dim=self.time_dim, target_dim=self.target_dim, - iter_dim=self.iter_dim) - logging.debug("loading finished") + logging.debug(f"{self.station[0]}: {e}") + logging.debug(f"{self.station[0]}: load new data") + data, meta = data_sources.download_data(file_name, meta_file, station, statistics_per_var, sampling, + store_data_locally=store_data_locally, data_origin=data_origin, + time_dim=self.time_dim, target_dim=self.target_dim, + iter_dim=self.iter_dim, era5_data_path=self._era5_data_path, + era5_file_names=self._era5_file_names, + ifs_data_path=self._ifs_data_path, + ifs_file_names=self._ifs_file_names) + logging.debug(f"{self.station[0]}: loading finished") # create slices and check for negative concentration. data = self._slice_prep(data, start=start, end=end) data = self.check_for_negative_concentrations(data) return data, meta - def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling, - store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, - target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) -> [xr.DataArray, pd.DataFrame]: - """ - Download data from TOAR database using the JOIN interface or load local era5 data. - - Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally - stored locally using given names for file and meta file. - - :param file_name: name of file to save data to (containing full path) - :param meta_file: name of the meta data file (also containing full path) - - :return: downloaded data and its meta data - """ - df_all = {} - df_era5, df_toar = None, None - meta_era5, meta_toar = None, None - if data_origin is not None: - era5_origin = filter_dict_by_value(data_origin, "era5", True) - era5_stats = select_from_dict(statistics_per_var, era5_origin.keys()) - toar_origin = filter_dict_by_value(data_origin, "era5", False) - toar_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False) - assert len(era5_origin) + len(toar_origin) == len(data_origin) - assert len(era5_stats) + len(toar_stats) == len(statistics_per_var) - else: - era5_origin, toar_origin = None, None - era5_stats, toar_stats = statistics_per_var, statistics_per_var - - # load data - if era5_origin is not None and len(era5_stats) > 0: - # load era5 data - df_era5, meta_era5 = data_sources.era5.load_era5(station_name=station, stat_var=era5_stats, - sampling=sampling, data_origin=era5_origin) - if toar_origin is None or len(toar_stats) > 0: - # load combined data from toar-data (v2 & v1) - df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, - sampling=sampling, data_origin=toar_origin) - - if df_era5 is None and df_toar is None: - raise data_sources.toar_data.EmptyQueryResult(f"No data available for era5 and toar-data") - - df = pd.concat([df_era5, df_toar], axis=1, sort=True) - if meta_era5 is not None and meta_toar is not None: - meta = meta_era5.combine_first(meta_toar) - else: - meta = meta_era5 if meta_era5 is not None else meta_toar - meta.loc["data_origin"] = str(data_origin) - meta.loc["statistics_per_var"] = str(statistics_per_var) - - df_all[station[0]] = df - # convert df_all to xarray - xarr = {k: xr.DataArray(v, dims=[time_dim, target_dim]) for k, v in df_all.items()} - xarr = xr.Dataset(xarr).to_array(dim=iter_dim) - if store_data_locally is True: - # save locally as nc/csv file - xarr.to_netcdf(path=file_name) - meta.to_csv(meta_file) - return xarr, meta - @staticmethod def check_station_meta(meta, station, data_origin, statistics_per_var): """ @@ -426,14 +385,14 @@ class DataHandlerSingleStation(AbstractDataHandler): Will raise a FileNotFoundError if the values mismatch. """ - check_dict = {"data_origin": str(data_origin), "statistics_per_var": str(statistics_per_var)} + check_dict = {"data_origin": data_origin, "statistics_per_var": statistics_per_var} for (k, v) in check_dict.items(): if v is None or k not in meta.index: continue - if meta.at[k, station[0]] != v: - logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != " - f"{meta.at[k, station[0]]} (local). Raise FileNotFoundError to trigger new " - f"grapping from web.") + m = ast.literal_eval(meta.at[k, station[0]]) + if not check_nested_equality(select_from_dict(m, v.keys()), v): + logging.debug(f"{station[0]}: meta data does not agree with given request for {k}: {v} (requested) != " + f"{m} (local). Raise FileNotFoundError to trigger new grapping from web.") raise FileNotFoundError def check_for_negative_concentrations(self, data: xr.DataArray, minimum: int = 0) -> xr.DataArray: @@ -451,7 +410,7 @@ class DataHandlerSingleStation(AbstractDataHandler): """ used_chem_vars = list(set(self.chem_vars) & set(data.coords[self.target_dim].values)) if len(used_chem_vars) > 0: - data.loc[..., used_chem_vars] = data.loc[..., used_chem_vars].clip(min=minimum) + data = data.sel({self.target_dim: used_chem_vars}).clip(min=minimum).combine_first(data) return data def setup_data_path(self, data_path: str, sampling: str): diff --git a/mlair/data_handler/data_handler_with_filter.py b/mlair/data_handler/data_handler_with_filter.py index e5760e9afb52f9d55071214fb632601d744f124e..ad6ccaaf8e3bd49e1337bbd6cafbe44f9ebb1019 100644 --- a/mlair/data_handler/data_handler_with_filter.py +++ b/mlair/data_handler/data_handler_with_filter.py @@ -374,7 +374,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation def apply_filter(self): """Apply FIR filter only on inputs.""" self.apriori = self.apriori.get(str(self)) if isinstance(self.apriori, dict) else self.apriori - logging.info(f"{self.station}: call ClimateFIRFilter") + logging.info(f"{self.station[0]}: call ClimateFIRFilter") climate_filter = ClimateFIRFilter(self.input_data.astype("float32"), self.fs, self.filter_order, self.filter_cutoff_freq, self.filter_window_type, time_dim=self.time_dim, var_dim=self.target_dim, @@ -383,7 +383,8 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation plot_path=self.plot_path, minimum_length=self.window_history_size, new_dim=self.window_dim, display_name=self.station[0], extend_length_opts=self.extend_length_opts, - offset=self.window_history_end, plot_dates=self.plot_dates) + extend_end=self.window_history_end, plot_dates=self.plot_dates, + offset=self.window_history_offset) self.climate_filter_coeff = climate_filter.filter_coefficients # store apriori information: store all if residuum_stat method was used, otherwise just store initial apriori @@ -459,12 +460,7 @@ class DataHandlerClimateFirFilterSingleStation(DataHandlerFirFilterSingleStation :param window: this parameter is not used in the inherited method :param dim_name_of_shift: Dimension along shift will be applied """ - data = self.input_data - sampling = {"daily": "D", "hourly": "h"}.get(to_list(self.sampling)[0]) - data.coords[dim_name_of_shift] = data.coords[dim_name_of_shift] - np.timedelta64(self.window_history_offset, - sampling) - data.coords[self.window_dim] = data.coords[self.window_dim] + self.window_history_offset - self.history = data + self.history = self.input_data # from matplotlib import pyplot as plt # d = self.load_and_interpolate(0) # data.sel(datetime="2007-07-07 00:00").sum("filter").plot() diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 69c9537b10ca583adf84480636680a99ab265a67..154f057622cf35d2af0810357e3f193307c6c499 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -22,7 +22,7 @@ import xarray as xr from mlair.data_handler.abstract_data_handler import AbstractDataHandler from mlair.helpers import remove_items, to_list, TimeTrackingWrapper -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult number = Union[float, int] diff --git a/mlair/helpers/data_sources/__init__.py b/mlair/helpers/data_sources/__init__.py index 6b753bc3afb961be65ff0f934ef4f0de08804a0b..34c70c5e1ae298bc880b6856ad93691f64415991 100644 --- a/mlair/helpers/data_sources/__init__.py +++ b/mlair/helpers/data_sources/__init__.py @@ -5,6 +5,7 @@ The module data_sources collects different data sources, namely ERA5, TOAR-Data """ __author__ = "Lukas Leufen" -__date__ = "2022-07-05" +__date__ = "2023-06-01" -from . import era5, join, toar_data, toar_data_v2 +from . import era5, join, toar_data, toar_data_v2, data_loader, ifs +from .data_loader import download_data diff --git a/mlair/helpers/data_sources/data_loader.py b/mlair/helpers/data_sources/data_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..fd0b5042a9525fdf28a67a064857a7aabd8b0b39 --- /dev/null +++ b/mlair/helpers/data_sources/data_loader.py @@ -0,0 +1,218 @@ +__author__ = 'Lukas Leufen' +__date__ = '2023-06-01' + +import logging +from typing import Dict, Union, List +import time +import random + +import requests +from requests.adapters import HTTPAdapter, Retry +# from requests.packages.urllib3.util.retry import Retry + +from mlair.helpers import filter_dict_by_value, select_from_dict, data_sources, TimeTracking +import pandas as pd +import xarray as xr + +DEFAULT_TIME_DIM = "datetime" +DEFAULT_TARGET_DIM = "variables" +DEFAULT_ITER_DIM = "Stations" +DEFAULT_WINDOW_DIM = "window" + + +def download_data(file_name: str, meta_file: str, station, statistics_per_var, sampling, + store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, + target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, + era5_data_path=None, era5_file_names=None, ifs_data_path=None, ifs_file_names=None) -> \ + [xr.DataArray, pd.DataFrame]: + """ + Download data from TOAR database using the JOIN interface or load local era5 data. + + Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally + stored locally using given names for file and meta file. + + :param file_name: name of file to save data to (containing full path) + :param meta_file: name of the meta data file (also containing full path) + + :return: downloaded data and its meta data + """ + df_era5_local, df_toar, df_ifs_local = None, None, None + meta_era5_local, meta_toar, meta_ifs_local = None, None, None + if data_origin is not None: + era5_local_origin = filter_dict_by_value(data_origin, "era5_local", True) + era5_local_stats = select_from_dict(statistics_per_var, era5_local_origin.keys()) + ifs_local_origin = filter_dict_by_value(data_origin, "ifs", True) + ifs_local_stats = select_from_dict(statistics_per_var, ifs_local_origin.keys()) + toar_origin = filter_dict_by_value(data_origin, ["era5_local", "ifs"], False) + toar_stats = select_from_dict(statistics_per_var, toar_origin.keys()) + assert len(era5_local_origin) + len(toar_origin) + len(ifs_local_origin) == len(data_origin) + assert len(era5_local_stats) + len(toar_stats) + len(ifs_local_stats) == len(statistics_per_var) + else: + era5_local_origin, toar_origin, ifs_local_origin = None, None, None + era5_local_stats, toar_stats, ifs_local_stats = statistics_per_var, statistics_per_var, statistics_per_var + + # load data + if era5_local_origin is not None and len(era5_local_stats) > 0: + # load era5 data + df_era5_local, meta_era5_local = data_sources.era5.load_era5( + station_name=station, stat_var=era5_local_stats, sampling=sampling, data_origin=era5_local_origin, + window_dim=window_dim, time_dim=time_dim, target_dim=target_dim, era5_data_path=era5_data_path, + era5_file_names=era5_file_names) + if ifs_local_origin is not None and len(ifs_local_stats) > 0: + # load era5 data + df_ifs_local, meta_ifs_local = data_sources.ifs.load_ifs( + station_name=station, stat_var=ifs_local_stats, sampling=sampling, data_origin=ifs_local_origin, + lead_time_dim=window_dim, initial_time_dim=time_dim, target_dim=target_dim, ifs_data_path=ifs_data_path, + ifs_file_names=ifs_file_names) + if toar_origin is None or len(toar_stats) > 0: + # load combined data from toar-data (v2 & v1) + df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, + sampling=sampling, data_origin=toar_origin, + window_dim=window_dim, time_dim=time_dim, + target_dim=target_dim) + + valid_df = [e for e in [df_era5_local, df_toar, df_ifs_local] if e is not None] + if len(valid_df) == 0: + raise EmptyQueryResult(f"No data available for era5_local, toar-data and ifs_local") + df = xr.concat(valid_df, dim=time_dim) + valid_meta = [e for e in [meta_era5_local, meta_toar, meta_ifs_local] if e is not None] + if len(valid_meta) > 0: + meta = valid_meta[0] + for e in valid_meta[1:]: + meta = meta.combine_first(e) + else: + meta = None + meta.loc["data_origin"] = str(data_origin) + meta.loc["statistics_per_var"] = str(statistics_per_var) + + xarr = df.expand_dims({iter_dim: station}) + if len(xarr.coords[window_dim]) <= 1: # keep window dim only if there is more than a single entry + xarr = xarr.squeeze(window_dim, drop=True) + if store_data_locally is True: + # save locally as nc/csv file + xarr.to_netcdf(path=file_name) + meta.to_csv(meta_file) + return xarr, meta + + +class EmptyQueryResult(Exception): + """Exception that get raised if a query to JOIN returns empty results.""" + + pass + + +def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes: + """ + Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2) + read status of id until finished, (3) download data with job id. + """ + url = create_url(**opts) + response_error = None + for retry in range(max_retries + 1): + time.sleep(random.random()) + try: + timeout = timeout_base * (2 ** retry) + logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") + start_time = time.time() + with TimeTracking(name=url): + session = retries_session(max_retries=0) + response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read) + while (time.time() - start_time) < timeout: + response = requests.get(response.json()["status"], timeout=(5, 5)) + if response.history: + break + time.sleep(2) + return response.content + except Exception as e: + time.sleep(retry) + logging.debug(f"There was an error for request {url}: {e}") + response_error = e + if retry + 1 >= max_retries: + raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}") + + +def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]: + """ + Download join data using requests framework. + + Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. + + :param opts: options to create the request url + :param headers: additional headers information like authorization, can be empty + :param as_json: extract response as json if true (default True) + + :return: requested data (either as list or dictionary) + """ + url = create_url(**opts) + response_error = None + for retry in range(max_retries + 1): + time.sleep(random.random()) + try: + timeout = timeout_base * (2 ** retry) + logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") + with TimeTracking(name=url): + session = retries_session(max_retries=0) + response = session.get(url, headers=headers, timeout=(5, timeout)) # timeout=(open, read) + if response.status_code == 200: + return response.json() if as_json is True else response.text + else: + logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}") + response_error = f"STATUS {response.status_code}" + except Exception as e: + time.sleep(2 * (2 ** retry)) + logging.debug(f"There was an error for request {url}: {e}") + response_error = e + if retry + 1 >= max_retries: + raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}") + + +def correct_stat_name(stat: str) -> str: + """ + Map given statistic name to new namespace defined by mapping dict. + + Return given name stat if not element of mapping namespace. + + :param stat: namespace from JOIN server + + :return: stat mapped to local namespace + """ + mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} + return mapping.get(stat, stat) + + +def create_url(base: str, service: str, param_id: Union[str, int, None] = None, + **kwargs: Union[str, int, float, None]) -> str: + """ + Create a request url with given base url, service type and arbitrarily many additional keyword arguments. + + :param base: basic url of the rest service + :param service: service type, e.g. series, stats + :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs + :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' + + :return: combined url as string + """ + url = f"{base}" + if not url.endswith("/"): + url += "/" + if service is not None: + url = f"{url}{service}" + if not url.endswith("/"): + url += "/" + if param_id is not None: + url = f"{url}{param_id}" + if len(kwargs) > 0: + url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" + return url + + +def retries_session(max_retries=5): + retry_strategy = Retry(total=max_retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["HEAD", "GET", "OPTIONS"]) + adapter = HTTPAdapter(max_retries=retry_strategy) + http = requests.Session() + http.mount("https://", adapter) + http.mount("http://", adapter) + return http diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index 8eb7a03b2629db1d006e03fcc9d30b2af714c270..14646231a243a6aff7ab1b80e2db7dcb7cf2869c 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -4,6 +4,7 @@ __date__ = "2022-06-09" import logging import os +from functools import partial import pandas as pd import xarray as xr @@ -12,17 +13,18 @@ from mlair import helpers from mlair.configuration.era5_settings import era5_settings from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.meteo import relative_humidity_from_dewpoint -def load_era5(station_name, stat_var, sampling, data_origin): +def load_era5(station_name, stat_var, sampling, data_origin, time_dim, window_dim, target_dim, era5_data_path=None, + era5_file_names=None): # make sure station_name parameter is a list station_name = helpers.to_list(station_name) # get data path - data_path, file_names = era5_settings(sampling) + data_path, file_names = era5_settings(sampling, era5_data_path=era5_data_path, era5_file_names=era5_file_names) # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -37,43 +39,53 @@ def load_era5(station_name, stat_var, sampling, data_origin): # sel data for station using sel method nearest logging.info(f"load data for {station_meta['codes'][0]} from ERA5") try: - with xr.open_mfdataset(os.path.join(data_path, file_names)) as data: - lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] - station_dask = data.sel(lon=lon, lat=lat, method="nearest", drop=True) - station_data = station_dask.to_array().T.compute() + lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] + file_names = os.path.join(data_path, file_names) + with xr.open_mfdataset(file_names, preprocess=partial(preprocess_era5_single_file, lon, lat)) as data: + station_data = data.to_array().T.compute() except OSError as e: logging.info(f"Cannot load era5 data from path {data_path} and filenames {file_names} due to: {e}") return None, None - # transform data and meta to pandas - station_data = station_data.to_pandas() if "relhum" in stat_var: - station_data["RHw"] = relative_humidity_from_dewpoint(station_data["D2M"], station_data["T2M"]) - station_data.columns = _rename_era5_variables(station_data.columns) + relhum = relative_humidity_from_dewpoint(station_data.sel(variable="D2M"), station_data.sel(variable="T2M")) + station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["RHw"]})], dim="variable") + station_data.coords["variable"] = _rename_era5_variables(station_data.coords["variable"].values) # check if all requested variables are available - if set(stat_var).issubset(station_data.columns) is False: - missing_variables = set(stat_var).difference(stat_var) + if set(stat_var).issubset(station_data.coords["variable"].values) is False: + missing_variables = set(stat_var).difference(station_data.coords["variable"].values) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, origin={origin}" raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") else: - station_data = station_data[stat_var] + station_data = station_data.sel(variable=list(stat_var.keys())) # convert to local timezone - station_data = correct_timezone(station_data, station_meta, sampling) + station_data.coords["time"] = correct_timezone(station_data.to_pandas(), station_meta, sampling).index + station_data = station_data.rename({"time": time_dim, "variable": target_dim}) - variable_meta = _emulate_meta_data(station_data) + # expand window_dim + station_data = station_data.expand_dims({window_dim: [0]}) + + # create meta data + variable_meta = _emulate_meta_data(station_data.coords[target_dim].values) meta = combine_meta_data(station_meta, variable_meta) meta = pd.DataFrame.from_dict(meta, orient='index') meta.columns = station_name return station_data, meta -def _emulate_meta_data(station_data): +def preprocess_era5_single_file(lon, lat, ds): + """Select lon and lat from data file and transform valid time into lead time.""" + ds = ds.sel(lon=lon, lat=lat, method="nearest", drop=True) + return ds + + +def _emulate_meta_data(variables): general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} roles_meta = {"roles": [{"contact": {"organisation": {"name": "ERA5", "longname": "ECMWF"}}}]} - variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.columns} + variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in variables} return variable_meta diff --git a/mlair/helpers/data_sources/ifs.py b/mlair/helpers/data_sources/ifs.py new file mode 100644 index 0000000000000000000000000000000000000000..ae16c33b968c43d992beaccab07a6a915870955e --- /dev/null +++ b/mlair/helpers/data_sources/ifs.py @@ -0,0 +1,124 @@ +"""Methods to load ifs data.""" +__author__ = "Lukas Leufen, Michael Langgut" +__date__ = "2023-06-07" + +import logging +import os +import re +import glob +from functools import partial + +import numpy as np +import pandas as pd +import xarray as xr + +from mlair import helpers +from mlair.configuration.ifs_settings import ifs_settings +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone +from mlair.helpers.data_sources.data_loader import EmptyQueryResult +from mlair.helpers.meteo import relative_humidity_from_dewpoint + + +def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initial_time_dim, target_dim, + ifs_data_path=None, ifs_file_names=None): + + # make sure station_name parameter is a list + station_name = helpers.to_list(station_name) + + # get data path + data_path, file_names = ifs_settings(sampling, ifs_data_path=ifs_data_path, ifs_file_names=ifs_file_names) + + # correct stat_var values if data is not aggregated (hourly) + if sampling == "hourly": + stat_var = {key: "values" for key in stat_var.keys()} + else: + raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") + + # load station meta using toar-data v2 API + meta_url_base, headers = toar_data_v2_settings("meta") + station_meta = load_station_information(station_name, meta_url_base, headers) + + # sel data for station using sel method nearest + logging.info(f"load data for {station_meta['codes'][0]} from IFS") + try: + lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] + file_names = sort_ifs_files(data_path) + with xr.open_mfdataset(file_names, preprocess=partial(preprocess_ifs_single_file, lon, lat), + concat_dim="initial_time", combine="nested") as data: + station_data = data.to_array().T.compute() + except OSError as e: + logging.info(f"Cannot load ifs data from path {data_path} and filenames {file_names} due to: {e}") + return None, None + + if "relhum" in stat_var: + relhum = relative_humidity_from_dewpoint(station_data.sel(variable="d2m"), station_data.sel(variable="t2m")) + station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["rhw"]})], dim="variable") + station_data.coords["variable"] = _rename_ifs_variables(station_data.coords["variable"].values) + + # check if all requested variables are available + if set(stat_var).issubset(station_data.coords["variable"].values) is False: + missing_variables = set(stat_var).difference(station_data.coords["variable"].values) + origin = helpers.select_from_dict(data_origin, missing_variables) + options = f"station={station_name}, origin={origin}" + raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") + else: + station_data = station_data.sel(variable=list(stat_var.keys())) + + # convert to local timezone + station_data.coords["initial_time"] = correct_timezone(station_data.sel(lead_time=0).to_pandas(), station_meta, + sampling).index + + # rename lead time and initial time to MLAir's internal dimension names + station_data = station_data.rename({"lead_time": lead_time_dim, "initial_time": initial_time_dim, + "variable": target_dim}) + + variable_meta = _emulate_meta_data(station_data.coords[target_dim].values) + meta = combine_meta_data(station_meta, variable_meta) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name + return station_data, meta + + +def sort_ifs_files(data_path, pattern="sfc_*.nc"): + def sort_by_date(file_name): + match = re.search(r'(\d{8})_(\d{2})', file_name) + if match: + return match.group(1), match.group(2) + file_names = glob.glob(os.path.join(data_path, pattern)) + return sorted(file_names, key=sort_by_date) + + +def preprocess_ifs_single_file(lon, lat, ds): + """Select lon and lat from data file and transform valid time into lead time.""" + ds = ds.sel(longitude=lon, latitude=lat, method="nearest", drop=True) + return expand_dims_initial_time(ds) + + +def expand_dims_initial_time(ds): + """Create lead time from initial time and valid time.""" + initial_time = ds.time[0] + lead_time = (ds.time - initial_time) / np.timedelta64(1, "h") + # ds = ds.expand_dims(dim={"initial_time": [initial_time.values], "lead_time": lead_time}, axis=(0, 1)) + ds.coords["time"] = lead_time + ds = ds.rename({"time": "lead_time"}) + ds = ds.expand_dims(dim={"initial_time": [initial_time.values]}, axis=0) + return ds + + +def _emulate_meta_data(variables): + general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} + roles_meta = {"roles": [{"contact": {"organisation": {"name": "IFS", "longname": "ECMWF"}}}]} + variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in variables} + return variable_meta + + +def _rename_ifs_variables(ifs_names): + mapper = {"sp": "press", "u10": "u", "v10": "v", "t2m": "temp", "d2m": "dew", "blh": "pblheight", + "tcc": "cloudcover", "rhw": "relhum"} + ifs_names = list(ifs_names) + try: + join_names = list(map(lambda x: mapper.get(x, x), ifs_names)) + return join_names + except KeyError as e: + raise KeyError(f"Cannot map names from ifs to join naming convention: {e}") diff --git a/mlair/helpers/data_sources/join.py b/mlair/helpers/data_sources/join.py index a978b2712a83b21f3c1256b2bf0826da63bdda3a..3a97717e0104a21574618304c34d5162fb1907ed 100644 --- a/mlair/helpers/data_sources/join.py +++ b/mlair/helpers/data_sources/join.py @@ -10,7 +10,7 @@ import pandas as pd from mlair import helpers from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources import toar_data, toar_data_v2 +from mlair.helpers.data_sources import toar_data, toar_data_v2, data_loader # join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' @@ -49,8 +49,8 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t missing_variables = set(stat_var).difference(vars_dict) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}" - raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in " - f"JOIN.") + raise data_loader.EmptyQueryResult( + f"No data found for variables {missing_variables} and options {options} in JOIN.") # correct stat_var values if data is not aggregated (hourly) if sampling == "hourly": @@ -71,7 +71,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t 'sampling': sampling, 'capture': 0, 'format': 'json'} # load data - data = toar_data.get_data(opts, headers) + data = data_loader.get_data(opts, headers) # adjust data format if given as list of list # no branch cover because this just happens when downloading hourly data using a secret token, not available @@ -80,7 +80,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t data = correct_data_format(data) # correct namespace of statistics - stat = toar_data.correct_stat_name(stat_var[var]) + stat = data_loader.correct_stat_name(stat_var[var]) # store data in pandas dataframe df = _save_to_pandas(df, data, stat, var) @@ -100,7 +100,7 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t meta.columns = station_name return df, meta else: - raise toar_data.EmptyQueryResult("No data found in JOIN.") + raise data_loader.EmptyQueryResult("No data found in JOIN.") def _correct_meta(meta): @@ -214,7 +214,7 @@ def load_series_information(station_name: List[str], station_type: str_or_none, opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts, "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} - station_vars = toar_data.get_data(opts, headers) + station_vars = data_loader.get_data(opts, headers) logging.debug(f"{station_name}: {station_vars}") return _select_distinct_series(station_vars, data_origin, network_name) diff --git a/mlair/helpers/data_sources/toar_data.py b/mlair/helpers/data_sources/toar_data.py index 27522855cbe0f3c6f0b78d1598709a694fc7b862..f5c12df497acdef5c72490496914439558e13eea 100644 --- a/mlair/helpers/data_sources/toar_data.py +++ b/mlair/helpers/data_sources/toar_data.py @@ -1,86 +1,15 @@ __author__ = "Lukas Leufen" __date__ = "2022-07-05" - -from typing import Union, List, Dict - from . import join, toar_data_v2 import requests -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry import pandas as pd +from .data_loader import EmptyQueryResult +import xarray as xr -class EmptyQueryResult(Exception): - """Exception that get raised if a query to JOIN returns empty results.""" - - pass - - -def create_url(base: str, service: str, param_id: Union[str, int, None] = None, - **kwargs: Union[str, int, float, None]) -> str: - """ - Create a request url with given base url, service type and arbitrarily many additional keyword arguments. - - :param base: basic url of the rest service - :param service: service type, e.g. series, stats - :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs - :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' - - :return: combined url as string - """ - url = f"{base}" - if not url.endswith("/"): - url += "/" - if service is not None: - url = f"{url}{service}" - if not url.endswith("/"): - url += "/" - if param_id is not None: - url = f"{url}{param_id}" - if len(kwargs) > 0: - url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" - return url - - -def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: - """ - Download join data using requests framework. - - Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. - - :param opts: options to create the request url - :param headers: additional headers information like authorization, can be empty - :param as_json: extract response as json if true (default True) - - :return: requested data (either as list or dictionary) - """ - url = create_url(**opts) - try: - response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) - if response.status_code == 200: - return response.json() if as_json is True else response.text - else: - raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") - except requests.exceptions.RetryError as e: - raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") - - -def retries_session(max_retries=3): - retry_strategy = Retry(total=max_retries, - backoff_factor=0.1, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=["HEAD", "GET", "OPTIONS"]) - adapter = HTTPAdapter(max_retries=retry_strategy) - http = requests.Session() - http.mount("https://", adapter) - http.mount("http://", adapter) - return http - - -def download_toar(station, toar_stats, sampling, data_origin): - +def download_toar(station, toar_stats, sampling, data_origin, window_dim, time_dim, target_dim): try: # load data from toar-data (v2) df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) @@ -101,7 +30,9 @@ def download_toar(station, toar_stats, sampling, data_origin): else: df_merged = df_toar if df_toar is not None else df_join meta_merged = meta_toar if df_toar is not None else meta_join - return df_merged, meta_merged + xarr_merged = xr.DataArray(df_merged, dims=[time_dim, target_dim]) + xarr_merged = xarr_merged.expand_dims({window_dim: [0]}) + return xarr_merged, meta_merged def merge_toar_join(df_toar, df_join, sampling): @@ -112,17 +43,3 @@ def merge_toar_join(df_toar, df_join, sampling): full_data = df_toar.reindex(full_time) full_data.update(df_join, overwrite=False) return full_data - - -def correct_stat_name(stat: str) -> str: - """ - Map given statistic name to new namespace defined by mapping dict. - - Return given name stat if not element of mapping namespace. - - :param stat: namespace from JOIN server - - :return: stat mapped to local namespace - """ - mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} - return mapping.get(stat, stat) diff --git a/mlair/helpers/data_sources/toar_data_v2.py b/mlair/helpers/data_sources/toar_data_v2.py index 0fa53a7eb23f11675eeef2c12a7d5dceec3c38ac..70b6b2d7fae453fb28da27c5eb59dd8b06e18935 100644 --- a/mlair/helpers/data_sources/toar_data_v2.py +++ b/mlair/helpers/data_sources/toar_data_v2.py @@ -10,11 +10,12 @@ from io import StringIO import pandas as pd import pytz from timezonefinder import TimezoneFinder +from io import BytesIO +import zipfile from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings from mlair.helpers import to_list -from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data, correct_stat_name - +from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query str_or_none = Union[str, None] @@ -121,9 +122,9 @@ def prepare_meta(meta, sampling, stat_var, var): for m in meta: opts = {} if sampling == "daily": - opts["timeseries_id"] = m.pop("id") + opts["id"] = m.pop("id") m["id"] = None - opts["names"] = stat_var[var] + opts["statistics"] = stat_var[var] opts["sampling"] = sampling out.append(([m], opts)) return out @@ -168,19 +169,33 @@ def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): series_id = meta["id"] # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} - if sampling != "hourly": + if sampling == "hourly": + res = get_data(opts, headers, as_json=False) + data = extract_timeseries_data(res, "string") + else: opts["service"] = None - res = get_data(opts, headers, as_json=False) - data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, - infer_datetime_format=True) + opts["format"] = None + res = get_data_with_query(opts, headers, as_json=False) + data = extract_timeseries_data(res, "bytes") if len(data.index) > 0: - data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) + data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"]) coll.append(data) return coll +def extract_timeseries_data(result, result_format): + if result_format == "string": + return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True, + infer_datetime_format=True) + elif result_format == "bytes": + with zipfile.ZipFile(BytesIO(result)) as file: + return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime", + parse_dates=True) + else: + raise ValueError(f"Unknown result format given: {result_format}") + + def load_station_information(station_name: List[str], url_base: str, headers: Dict): - # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} return get_data(opts, headers) @@ -223,10 +238,14 @@ def select_timeseries_by_origin(toar_meta, var_origin): res = [] for origin in to_list(var_origin): for meta in toar_meta: - for roles in meta["roles"]: - if roles["contact"]["organisation"]["name"].lower() == origin.lower(): - res.append(meta) - break + if meta["data_origin"] == "instrument": + for roles in meta["roles"]: + if roles["contact"]["organisation"]["name"].lower() == origin.lower(): + res.append(meta) + break + elif meta["data_origin"].lower() == origin.lower(): + res.append(meta) + break return res diff --git a/mlair/helpers/filter.py b/mlair/helpers/filter.py index 097eaf97e6ef8023abdc59e2b9e77c4d56aa25df..ad4eb57cf2ad1e51dba50ffb02e84b37e1644626 100644 --- a/mlair/helpers/filter.py +++ b/mlair/helpers/filter.py @@ -7,7 +7,7 @@ import datetime import numpy as np import pandas as pd from matplotlib import pyplot as plt -from scipy import signal +from scipy import signal, stats import xarray as xr import dask.array as da @@ -18,7 +18,7 @@ class FIRFilter: from mlair.plotting.data_insight_plotting import PlotFirFilter def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None, - offset=0, plot_path=None, plot_dates=None): + extend_end=0, plot_path=None, plot_dates=None, offset=0): self._filtered = [] self._h = [] self.data = data @@ -30,6 +30,7 @@ class FIRFilter: self.time_dim = time_dim self.display_name = display_name self.minimum_length = minimum_length + self.extend_end = extend_end self.offset = offset self.plot_path = plot_path self.plot_dates = plot_dates @@ -57,7 +58,7 @@ class FIRFilter: # visualization plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi, - self.minimum_length, self.order, i, self.offset, self.var_dim)) + self.minimum_length, self.order, i, self.extend_end, self.var_dim)) # calculate residuum input_data = input_data - fi @@ -75,7 +76,7 @@ class FIRFilter: logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling, - h, minimum_length, order, i, offset, var_dim): # pragma: no cover + h, minimum_length, order, i, extend_end, var_dim): # pragma: no cover plot_data = [] minimum_length = minimum_length or 0 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): @@ -88,7 +89,7 @@ class FIRFilter: extend_length_history = minimum_length + int((length + 1) / 2) extend_length_future = int((length + 1) / 2) + 1 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) - t_plus = viz_date + np.timedelta64(int(extend_length_future + offset), td_type) + t_plus = viz_date + np.timedelta64(int(extend_length_future + extend_end), td_type) time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type)) plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}), "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim, @@ -174,7 +175,7 @@ class ClimateFIRFilter(FIRFilter): def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, apriori_diurnal=False, sel_opts=None, plot_path=None, minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0, - offset: Union[dict, int] = 0, plot_dates=None): + extend_end: Union[dict, int] = 0, plot_dates=None, offset: int = 0): """ :param data: data to filter :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 @@ -196,11 +197,12 @@ class ClimateFIRFilter(FIRFilter): :param extend_length_opts: shift information switch between historical data and apriori estimation by the given values (default None). Must either be a dictionary with keys available in var_dim or a single value that is applied to all data. This parameter has only influence on which information is available at t0 for the - filter calculcation but has no influence on the shape of the returned filtered data. - :param offset: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. In - case the offset parameter is larger than the extend_lenght_opts parameter, this leads to the case that not - only observational data but also climatological estimations are returned. Must either be a dictionary with - keys available in var_dim or a single value that is applied to all data. Default is 0. + filter calculation but has no influence on the shape of the returned filtered data. + :param extend_end: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. + In case the extend_end parameter is larger than the extend_length_opts parameter, this leads to the case + that not only observational data but also climatological estimations are returned. Must either be a + dictionary with keys available in var_dim or a single value that is applied to all data. Default is 0. + :param offset: Shift t0 by this timestep delta. Default is 0. """ self._apriori = apriori self.apriori_type = apriori_type @@ -211,15 +213,22 @@ class ClimateFIRFilter(FIRFilter): self.plot_data = [] self.extend_length_opts = extend_length_opts super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name, - minimum_length=minimum_length, plot_path=plot_path, offset=offset, plot_dates=plot_dates) + minimum_length=minimum_length, plot_path=plot_path, extend_end=extend_end, plot_dates=plot_dates, offset=offset) def run(self): filtered = [] h = [] + forecasts = None if self.sel_opts is not None: self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} self._check_sel_opts() sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) + + if self.new_dim in self.data.coords: + pseudo_timeseries = self.create_pseudo_timeseries(self.data, self.time_dim, sampling, self.new_dim) + forecasts = self.data.__deepcopy__() + self.data = pseudo_timeseries + logging.debug(f"{self.display_name}: create diurnal_anomalies") if self.apriori_diurnal is True and sampling == "1H": diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, @@ -251,21 +260,23 @@ class ClimateFIRFilter(FIRFilter): minimum_length=self.minimum_length, new_dim=new_dim, plot_dates=plot_dates, display_name=self.display_name, extend_opts=self.extend_length_opts, - offset=self.offset, next_order=next_order) + extend_end=self.extend_end, next_order=next_order, + forecasts=forecasts, offset=self.offset) logging.info(f"{self.display_name}: finished clim_filter calculation") if self.minimum_length is None: - filtered.append(fi.sel({new_dim: slice(None, self.offset)})) + filtered.append(fi.sel({new_dim: slice(None, self.extend_end)})) else: - filtered.append(fi.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) + filtered.append(fi.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) h.append(hi) gc.collect() + forecasts = None # reset/disable forecasts after first filter iteration self.plot_data.append(plot_data) plot_dates = {e["t0"] for e in plot_data} # calculate residuum logging.info(f"{self.display_name}: calculate residuum") - coord_range = range(fi.coords[new_dim].values.min(), fi.coords[new_dim].values.max() + 1) + coord_range = range(int(fi.coords[new_dim].values.min()), int(fi.coords[new_dim].values.max() + 1)) if new_dim in input_data.coords: input_data = input_data.sel({new_dim: coord_range}) - fi else: @@ -293,9 +304,9 @@ class ClimateFIRFilter(FIRFilter): # add last residuum to filtered if self.minimum_length is None: - filtered.append(input_data.sel({new_dim: slice(None, self.offset)})) + filtered.append(input_data.sel({new_dim: slice(None, self.extend_end)})) else: - filtered.append(input_data.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) + filtered.append(input_data.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) self._filtered = filtered self._h = h @@ -382,7 +393,7 @@ class ClimateFIRFilter(FIRFilter): monthly_mean.sel(month=month, drop=True), monthly) # transform monthly information into original sampling rate - return monthly.resample({time_dim: sampling}).interpolate() + return monthly.dropna(dim=time_dim).resample({time_dim: sampling}).interpolate() @staticmethod def _compute_hourly_mean_per_month(data: xr.DataArray, time_dim: str, as_anomaly: bool) -> Dict[int, xr.DataArray]: @@ -422,7 +433,7 @@ class ClimateFIRFilter(FIRFilter): for month in means.keys(): hourly_mean_single_month = means[month].sel(hour=hour, drop=True) h_coll = xr.where((h_coll[f"{time_dim}.month"] == month), hourly_mean_single_month, h_coll) - h_coll = h_coll.resample({time_dim: sampling}).interpolate() + h_coll = h_coll.dropna(time_dim).resample({time_dim: sampling}).interpolate() h_coll = h_coll.sel({time_dim: (h_coll[f"{time_dim}.hour"] == hour)}) return h_coll @@ -534,9 +545,16 @@ class ClimateFIRFilter(FIRFilter): return apriori + @staticmethod + def get_forecast_run_delta(data, time_dim): + time_data = data.coords[time_dim].values + deltas = (time_data[1:] - time_data[:-1]) / np.timedelta64(1, "h") + return stats.mode(deltas).mode[0] + def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str, extend_length_history: int, extend_length_future: int, - extend_length_separator: int = 0) -> xr.DataArray: + extend_length_separator: int = 0, forecasts: xr.DataArray = None, + sampling: str = "1H", extend_end: int = 0, offset: int = 0) -> xr.DataArray: """ Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If @@ -557,34 +575,76 @@ class ClimateFIRFilter(FIRFilter): :returns: combined data array """ # prepare historical data / observation - ext_sep = min(extend_length_separator, extend_length_future) + if forecasts is None: + ext_sep = offset + min(extend_end, extend_length_future, extend_length_separator) + else: + ext_sep = min(offset, extend_length_future) if new_dim not in data.coords: history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1), time_dim, new_dim) else: history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)}) + if forecasts is not None: + forecast_delta = self.get_forecast_run_delta(forecasts, time_dim) + forecast_end = offset + min(extend_end, extend_length_separator) + for lead_time in forecasts.coords[new_dim]: + delta = np.timedelta64(int(lead_time - offset), {"1d": "D", "1H": "h"}.get(sampling)) + forecasts_tmp = forecasts.sel({new_dim: slice(None, forecast_end + lead_time)}) + forecasts_tmp.coords[time_dim] = forecasts_tmp.coords[time_dim] + delta + forecasts_tmp.coords[new_dim] = forecasts_tmp.coords[new_dim] + offset - lead_time + history = history.combine_first(forecasts_tmp) + # history.plot() + if lead_time >= forecast_delta - 1: # stop when all gaps are filled + break + if extend_length_future > ext_sep + 1: # prepare climatological statistics if new_dim not in apriori.coords: future = self._shift_data(apriori, range(ext_sep + 1, - extend_length_future + 1), + offset + extend_length_future + 1), time_dim, new_dim) else: - future = apriori.sel({new_dim: slice(ext_sep + 1, - extend_length_future)}) + future = apriori.sel({new_dim: slice(ext_sep + 1, offset + extend_length_future)}) # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] - filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") - return filter_input_data + filter_input_data = history.combine_first(future) + # filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") + return filter_input_data.dropna(time_dim) else: return history + @staticmethod + def create_full_time_dim(data, dim, freq): + """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped.""" + start = data.coords[dim].values[0] + end = data.coords[dim].values[-1] + datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq)) + t = data.sel({dim: start}, drop=True) + res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords]) + res = res.transpose(*data.dims) + res.loc[data.coords] = data + return res + + def create_pseudo_timeseries(self, data, time_dim, sampling, window_dim): + empty_data = data.sel({window_dim: 0}, drop=True) * np.nan + data_full_time_index = self.create_full_time_dim(empty_data, time_dim, sampling) + for lead_time in data.coords[window_dim]: + data_tmp = data.sel({window_dim: lead_time}, drop=True) + delta = np.timedelta64(int(lead_time), {"1d": "D", "1H": "h"}.get(sampling)) + data_tmp.coords[time_dim] = data_tmp.coords[time_dim] + delta + data_full_time_index = data_full_time_index.combine_first(data_tmp) + if data_full_time_index.isnull().sum() == 0: # stop when all gaps are filled + break + return data_full_time_index + def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, minimum_length, h, - variable_name, extend_length_opts=None, offset=None): # pragma: no cover + variable_name, extend_length_opts=None, extend_end=None, offset=None, forecast=None): # pragma: no cover + plot_data = [] - offset = 0 if offset is None else offset + extend_end = 0 if extend_end is None else extend_end extend_length_opts = 0 if extend_length_opts is None else extend_length_opts + offset = 0 if offset is None else offset for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values): try: td_type = {"1d": "D", "1H": "h"}.get(sampling) @@ -598,14 +658,32 @@ class ClimateFIRFilter(FIRFilter): new_dim).sel({time_dim: t0}) else: tmp_filter_data = None + filter_input = filter_input_data.sel({time_dim: t0, new_dim: slice(None, extend_length_future)}) valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2) - valid_end = min(extend_length_opts + offset + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) + valid_end = min(extend_length_opts + extend_end + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) valid_range = range(valid_start, valid_end) - plot_data.append({"t0": t0, + # if forecast is not None: + # forecast_deltas = (t0 - forecast.coords[time_dim]) / np.timedelta64(1, "h") + 12 + # minimal_forecast_delta = int(forecast_deltas[forecast_deltas >= 0][-1]) + # init_time = forecast.coords[time_dim][forecast_deltas >= 0][-1] + # forecast_end = min(extend_end, extend_length_opts) + # f = forecast.sel({time_dim: init_time, new_dim: slice(None, forecast_end)}) + # f.coords[time_dim] = t0 + # f.coords[new_dim] = f.coords[new_dim] - minimal_forecast_delta + offset + # else: + # f = None + # correct all data for offset + def correct_data_for_offset(d): + d = d.__deepcopy__() + d.coords[time_dim] = d.coords[time_dim] + np.timedelta64(int(offset), td_type) + d.coords[new_dim] = d.coords[new_dim] - offset + return d + plot_data.append({"t0": t0 + np.timedelta64(int(offset), td_type), "var": variable_name, - "filter_input": filter_input_data.sel({time_dim: t0}), - "filter_input_nc": tmp_filter_data, - "valid_range": valid_range, + "filter_input": correct_data_for_offset(filter_input), + "filter_input_nc": correct_data_for_offset(tmp_filter_data), + "valid_range": range(valid_range.start - offset, valid_range.stop - offset), + # "forecast": f, "time_range": data.sel( {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ time_dim].values, @@ -649,19 +727,18 @@ class ClimateFIRFilter(FIRFilter): @staticmethod def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, - extend_length_future: int = 0) -> xr.DataArray: + extend_length_future: int = 0, offset: int = 0) -> xr.DataArray: """ Trim data along given axis between either -minimum_length (if given) or -extend_length_history and extend_length_opts (which is default set to 0). :param data: data to trim - :param extend_length_history: start number for trim range (transformed to negative), only used if parameter - minimum_length is not provided + :param extend_length_history: start number for trim range, only used if parameter minimum_length is not provided :param dim: dim to apply trim on :param extend_length_future: number to use in "future" :returns: trimmed data """ - return data.sel({dim: slice(-extend_length_history, extend_length_future)}, drop=True) + return data.sel({dim: slice(extend_length_history + offset, extend_length_future + offset)}, drop=True) @staticmethod def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, @@ -686,7 +763,7 @@ class ClimateFIRFilter(FIRFilter): def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None, - extend_opts: int = 0, offset: int = 0): + extend_opts: int = 0, extend_end: int = 0, forecasts=None, offset: int = 0): logging.debug(f"{display_name}: extend apriori") @@ -701,8 +778,8 @@ class ClimateFIRFilter(FIRFilter): length = len(h) # set data extend that is required for filtering - extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - offset - extend_length_future = offset + int((next_order + 1) / 2) + int((length + 1) / 2) + extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - extend_end + extend_length_future = extend_end + int((next_order + 1) / 2) + int((length + 1) / 2) # collect some data for visualization plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs @@ -728,12 +805,14 @@ class ClimateFIRFilter(FIRFilter): _year, sampling, max(extend_length_history, extend_length_future)) d = data.sel({var_dim: [var], time_dim: time_slice}) a = apriori.sel({var_dim: [var], time_dim: time_slice}) + f = forecasts.sel({var_dim: [var]}) if forecasts is not None else None if len(d.coords[time_dim]) == 0: # no data at all for this year continue # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, - extend_length_future, extend_length_separator=extend_opts) + extend_length_future, extend_length_separator=extend_opts, forecasts=f, sampling=sampling, + extend_end=extend_end, offset=offset) # select only data for current year try: @@ -751,19 +830,20 @@ class ClimateFIRFilter(FIRFilter): vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) # trim data if required + valid_range_start = int(filt.coords[new_dim].min() + (length + 1) / 2) valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1 ext_len = min(extend_length_future, valid_range_end) - trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, + trimmed = self._trim_data_to_minimum_length(filt, valid_range_start, new_dim, extend_length_future=ext_len) filt_coll.append(trimmed) - trimmed = self._trim_data_to_minimum_length(filter_input_data, extend_length_history, new_dim, + trimmed = self._trim_data_to_minimum_length(filter_input_data, valid_range_start, new_dim, extend_length_future=ext_len) filt_input_coll.append(trimmed) # visualization plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, sampling, extend_length_history, extend_length_future, - minimum_length, h, var, extend_opts, offset)) + minimum_length, h, var, extend_opts, extend_end, offset, f)) # collect all filter results coll.append(xr.concat(filt_coll, time_dim)) diff --git a/mlair/helpers/helpers.py b/mlair/helpers/helpers.py index 0b97f826ee34a35dc62313ed9350919a94931e62..4ebea8a042bea658e08228a96bfbfa502f071317 100644 --- a/mlair/helpers/helpers.py +++ b/mlair/helpers/helpers.py @@ -283,7 +283,8 @@ def filter_dict_by_value(dictionary: dict, filter_val: Any, filter_cond: bool) - do not match the criteria (if `False`) :returns: a filtered dict with either matching or non-matching elements depending on the `filter_cond` """ - return dict(filter(lambda x: (x[1] == filter_val) is filter_cond, dictionary.items())) + filter_val = to_list(filter_val) + return dict(filter(lambda x: (x[1] in filter_val) is filter_cond, dictionary.items())) def str2bool(v): diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index 9415e0fcb0557f792d1cc0679868caff97e316d3..c897f875c56da820512c0de58aca34f9cf0fc4cb 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -637,3 +637,16 @@ def create_n_bootstrap_realizations(data: xr.DataArray, dim_name_time: str, dim_ realizations[season][boot] = calculate_average(shuffled.sel({dim_name_time: sel}), dim=dim_name_time, skipna=True) return realizations + + +def calculate_bias_free_data(data, time_dim="index", window_size=30): + + # overall mean + overall_mean = data.mean(time_dim) + bias_free_data = data - overall_mean + + # seasonal mean + seasonal_mean = data.rolling(**{time_dim: window_size}, min_periods=int(window_size/2), center=True).mean() + seasonal_bias_free_data = data - seasonal_mean + + return bias_free_data, seasonal_bias_free_data \ No newline at end of file diff --git a/mlair/keras_legacy/__init__.py b/mlair/keras_legacy/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/mlair/plotting/data_insight_plotting.py b/mlair/plotting/data_insight_plotting.py index d33f3abb2e2cd04399a2d98f34bf2ed06acfcb6b..76e7aadcacad88ed761c1d15480b7745dec0bd1a 100644 --- a/mlair/plotting/data_insight_plotting.py +++ b/mlair/plotting/data_insight_plotting.py @@ -13,6 +13,7 @@ import sys import numpy as np import pandas as pd import xarray as xr +import seaborn as sns import matplotlib # matplotlib.use("Agg") from matplotlib import lines as mlines, pyplot as plt, patches as mpatches, dates as mdates @@ -447,6 +448,71 @@ class PlotAvailabilityHistogram(AbstractPlotClass): # pragma: no cover plt.tight_layout() +@TimeTrackingWrapper +class PlotDataMonthlyDistribution(AbstractPlotClass): # pragma: no cover + + def __init__(self, generators: Dict[str, DataCollection], plot_folder: str = ".", variables_dim="variables", + time_dim="datetime", window_dim="window", target_var: str = "", target_var_unit: str = "ppb"): + """Set attributes and create plot.""" + super().__init__(plot_folder, "monthly_data_distribution") + self.variables_dim = variables_dim + self.time_dim = time_dim + self.window_dim = window_dim + self.coll_dim = "coll" + self.subset_dim = "subset" + self._data = self._prepare_data(generators) + self._plot(target_var, target_var_unit) + self._save() + + def _prepare_data(self, generators) -> List[xr.DataArray]: + """ + Pre.process data required to plot. + + :param generator: data + :return: The entire data set, flagged with the corresponding month. + """ + f = lambda x: x.get_observation() + forecasts = [] + for set_type, generator in generators.items(): + forecasts_set = None + forecasts_monthly = {} + for i, gen in enumerate(generator): + data = f(gen) + data = gen.apply_transformation(data, inverse=True) + data = data.clip(min=0).reset_coords(drop=True) + new_index = data.coords[self.time_dim].values.astype("datetime64[M]").astype(int) % 12 + 1 + data = data.assign_coords({self.time_dim: new_index}) + forecasts_set = xr.concat([forecasts_set, data], self.time_dim) if forecasts_set is not None else data + for month in set(forecasts_set.coords[self.time_dim].values): + monthly_values = forecasts_set.sel({self.time_dim: month}).values + forecasts_monthly[month] = np.concatenate((forecasts_monthly.get(month, []), monthly_values)) + forecasts_monthly = pd.DataFrame.from_dict(forecasts_monthly, orient="index")#.transpose() + forecasts_monthly[self.coll_dim] = set_type + forecasts.append(forecasts_monthly.set_index(self.coll_dim, append=True)) + forecasts = pd.concat(forecasts).stack().rename_axis(["month", "subset", "index"]) + forecasts = forecasts.to_frame(name="values").reset_index(level=[0, 1]) + return forecasts + + @staticmethod + def _spell_out_chemical_concentrations(short_name: str, add_concentration: bool = False): + short2long = {'o3': 'ozone', 'no': 'nitrogen oxide', 'no2': 'nitrogen dioxide', 'nox': 'nitrogen dioxides'} + _suffix = "" if add_concentration is False else " concentration" + return f"{short2long[short_name]}{_suffix}" + + def _plot(self, target_var: str, target_var_unit: str): + """ + Create a monthly grouped box plot over all stations but with separate boxes for each lead time step. + + :param target_var: display name of the target variable on plot's axis + """ + ax = sns.boxplot(data=self._data, x="month", y="values", hue="subset", whis=1.5, + palette=self.get_dataset_colors(), flierprops={'marker': '.', 'markersize': 1}, showmeans=True, + meanprops={'markersize': 1, 'markeredgecolor': 'k'}) + ylabel = self._spell_out_chemical_concentrations(target_var) + ax.set(xlabel='month', ylabel=f'dma8 {ylabel} (in {target_var_unit})') + plt.tight_layout() + + @TimeTrackingWrapper class PlotDataHistogram(AbstractPlotClass): # pragma: no cover """ @@ -1027,6 +1093,9 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover # clim apriori self._plot_apriori(ax, time_axis, filter_input, new_dim, ifilter, offset=1) + # get ax lims + ylims = ax.get_ylim() + # clim filter response residuum_estimated = self._plot_clim_filter(ax, time_axis, filter_input, new_dim, h, output_dtypes=filter_input.dtype) @@ -1037,6 +1106,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover # set title, legend, and save plot xlims = self._set_xlim(ax, t0, filter_order, valid_range, td_type, time_axis) + ax.set_ylim(ylims) plt.title(f"Input of ClimFilter ({str(var)})") plt.legend() @@ -1052,6 +1122,7 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover self._plot_series(ax, time_axis, residuum_true.values.flatten(), style="ideal") self._plot_series(ax, time_axis, residuum_estimated.values.flatten(), style="clim") ax.set_xlim(xlims) + self._set_ylim_by_valid_range(ax, residuum_true, residuum_estimated, new_dim, valid_range) plt.title(f"Residuum of ClimFilter ({str(var)})") plt.legend(loc="upper left") fig.autofmt_xdate() @@ -1063,6 +1134,16 @@ class PlotClimateFirFilter(AbstractPlotClass): # pragma: no cover logging.info(f"Could not create plot because of:\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") pass + @staticmethod + def _set_ylim_by_valid_range(ax, a, b, dim, valid_range): + ymax = max(a.sel({dim: valid_range}).max(), + b.sel({dim: valid_range}).max()) + ymin = min(a.sel({dim: valid_range}).min(), + b.sel({dim: valid_range}).min()) + ymax = 1.1 * ymax if ymax > 0 else 0.9 * ymax + ymin = 0.9 * ymin if ymin > 0 else 1.1 * ymin + ax.set_ylim((ymin, ymax)) + def _set_xlim(self, ax, t0, order, valid_range, td_type, time_axis): """ Set xlims diff --git a/mlair/reference_models/reference_model_cams.py b/mlair/reference_models/reference_model_cams.py index 1db19c05a846ec948d3eda71727d11dd597643fa..4c920cff63d41d18810413c132080a8a9a31aaff 100644 --- a/mlair/reference_models/reference_model_cams.py +++ b/mlair/reference_models/reference_model_cams.py @@ -11,7 +11,16 @@ import pandas as pd class CAMSforecast(AbstractReferenceModel): - def __init__(self, ref_name: str, ref_store_path: str = None, data_path: str = None): + def __init__(self, ref_name: str, ref_store_path: str = None, data_path: str = None, interp_method: str = None): + """ + Use parameters `cams_data_path` to set `data_path` and `cams_interp_method` to set `interp_method` in MLAir + run script. + + :param ref_name: + :param ref_store_path: + :param data_path: + :param interp_method: + """ super().__init__() self.ref_name = ref_name @@ -22,6 +31,7 @@ class CAMSforecast(AbstractReferenceModel): self.data_path = os.path.abspath(".") else: self.data_path = os.path.abspath(data_path) + self.interp_method = interp_method self.file_pattern = "forecasts_%s_test.nc" self.time_dim = "index" self.ahead_dim = "ahead" @@ -36,7 +46,11 @@ class CAMSforecast(AbstractReferenceModel): darray = dataset.to_array().sortby(["longitude", "latitude"]) for station, coords in missing_stations.items(): lon, lat = coords["lon"], coords["lat"] - station_data = darray.sel(longitude=lon, latitude=lat, method="nearest", drop=True).squeeze(drop=True) + if self.interp_method is None: + station_data = darray.sel(longitude=lon, latitude=lat, method="nearest", drop=True).squeeze(drop=True) + else: + station_data = darray.interp(**{"longitude": lon, "latitude": lat}, method=self.interp_method) + station_data = station_data.drop_vars(["longitude", "latitude"]).squeeze(drop=True) station_data = station_data.expand_dims(dim={self.type_dim: [self.ref_name]}).compute() station_data.coords[self.time_dim] = station_data.coords[self.time_dim] - pd.Timedelta(days=1) station_data.coords[self.ahead_dim] = station_data.coords[self.ahead_dim] + 1 diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index f89633cbe0f80f26dbb2481ca24a7fd294ee6888..c12664e356d30bfc23333c3888d750bf245f4564 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -24,7 +24,8 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_FEATURE_IMPORTANCE_BOOTSTRAP_TYPE, DEFAULT_FEATURE_IMPORTANCE_BOOTSTRAP_METHOD, DEFAULT_OVERWRITE_LAZY_DATA, \ DEFAULT_UNCERTAINTY_ESTIMATE_BLOCK_LENGTH, DEFAULT_UNCERTAINTY_ESTIMATE_EVALUATE_COMPETITORS, \ DEFAULT_UNCERTAINTY_ESTIMATE_N_BOOTS, DEFAULT_DO_UNCERTAINTY_ESTIMATE, DEFAULT_CREATE_SNAPSHOT, \ - DEFAULT_EARLY_STOPPING_EPOCHS, DEFAULT_RESTORE_BEST_MODEL_WEIGHTS, DEFAULT_COMPETITORS + DEFAULT_EARLY_STOPPING_EPOCHS, DEFAULT_RESTORE_BEST_MODEL_WEIGHTS, DEFAULT_COMPETITORS, \ + DEFAULT_DO_BIAS_FREE_EVALUATION from mlair.data_handler import DefaultDataHandler from mlair.run_modules.run_environment import RunEnvironment from mlair.model_modules.fully_connected_networks import FCN_64_32_16 as VanillaModel @@ -240,9 +241,10 @@ class ExperimentSetup(RunEnvironment): max_number_multiprocessing: int = None, start_script: Union[Callable, str] = None, overwrite_lazy_data: bool = None, uncertainty_estimate_block_length: str = None, uncertainty_estimate_evaluate_competitors: bool = None, uncertainty_estimate_n_boots: int = None, - do_uncertainty_estimate: bool = None, model_display_name: str = None, transformation_file: str = None, + do_uncertainty_estimate: bool = None, do_bias_free_evaluation: bool = None, + model_display_name: str = None, transformation_file: str = None, calculate_fresh_transformation: bool = None, snapshot_load_path: str = None, - create_snapshot: bool = None, snapshot_path: str = None, **kwargs): + create_snapshot: bool = None, snapshot_path: str = None, model_path: str = None, **kwargs): # create run framework super().__init__() @@ -297,6 +299,7 @@ class ExperimentSetup(RunEnvironment): self._set_param("batch_path", batch_path, default=os.path.join(experiment_path, "batch_data")) # set model path + self._set_param("model_load_path", model_path) self._set_param("model_path", None, os.path.join(experiment_path, "model")) path_config.check_path_and_create(self.data_store.get("model_path")) @@ -393,6 +396,8 @@ class ExperimentSetup(RunEnvironment): default=DEFAULT_UNCERTAINTY_ESTIMATE_EVALUATE_COMPETITORS, scope="uncertainty_estimate") self._set_param("n_boots", uncertainty_estimate_n_boots, default=DEFAULT_UNCERTAINTY_ESTIMATE_N_BOOTS, scope="uncertainty_estimate") + self._set_param("do_bias_free_evaluation", do_bias_free_evaluation, + default=DEFAULT_DO_BIAS_FREE_EVALUATION, scope="general.postprocessing") self._set_param("evaluate_feature_importance", evaluate_feature_importance, default=DEFAULT_EVALUATE_FEATURE_IMPORTANCE, scope="general.postprocessing") diff --git a/mlair/run_modules/model_setup.py b/mlair/run_modules/model_setup.py index efeff06260d1df41d4a83592c225f316574e77eb..3933d8d6de6288ff6edb235c135b4994c95c0098 100644 --- a/mlair/run_modules/model_setup.py +++ b/mlair/run_modules/model_setup.py @@ -6,6 +6,8 @@ __date__ = '2019-12-02' import logging import os import re +import shutil + from dill.source import getsource import tensorflow.keras as keras @@ -58,15 +60,16 @@ class ModelSetup(RunEnvironment): """Initialise and run model setup.""" super().__init__() self.model = None - exp_name = self.data_store.get("experiment_name") - self.path = self.data_store.get("model_path") self.scope = "model" - path = os.path.join(self.path, f"{exp_name}_%s") - self.model_name = path % "%s.h5" - self.checkpoint_name = path % "model-best.h5" - self.callbacks_name = path % "model-best-callbacks-%s.pickle" self._train_model = self.data_store.get("train_model") self._create_new_model = self.data_store.get("create_new_model") + self.path = self.data_store.get("model_path") + self.model_display_name = self.data_store.get_default("model_display_name", default=None) + self.model_load_path = None + path = self._set_model_path() + self.model_path = path % "%s.h5" + self.checkpoint_name = path % "model-best.h5" + self.callbacks_name = path % "model-best-callbacks-%s.pickle" self._run() def _run(self): @@ -96,6 +99,20 @@ class ModelSetup(RunEnvironment): # report settings self.report_model() + def _set_model_path(self): + exp_name = self.data_store.get("experiment_name") + self.model_load_path = self.data_store.get_default("model_load_path", default=None) + if self.model_load_path is not None: + if not self.model_load_path.endswith(".h5"): + raise FileNotFoundError(f"When providing external models, you need to provide full path including the " + f".h5 file. Given path is not valid: {self.model_load_path}") + if any([self._train_model, self._create_new_model]): + raise ValueError(f"Providing `model_path` along with parameters train_model={self._train_model} and " + f"create_new_model={self._create_new_model} is not possible. Either set both " + f"parameters to False or remove `model_path` parameter. Given was: model_path = " + f"{self.model_load_path}") + return os.path.join(self.path, f"{exp_name}_%s") + def _set_shapes(self): """Set input and output shapes from train collection.""" shape = list(map(lambda x: x.shape[1:], self.data_store.get("data_collection", "train")[0].get_X())) @@ -159,11 +176,18 @@ class ModelSetup(RunEnvironment): # store callbacks self.data_store.set("callbacks", callbacks, self.scope) + def copy_model(self): + """Copy external model to internal experiment structure.""" + if self.model_load_path is not None: + logging.info(f"Copy external model file: {self.model_load_path} -> {self.model_path}") + shutil.copyfile(self.model_load_path, self.model_path) + def load_model(self): """Try to load model from disk or skip if not possible.""" + self.copy_model() try: - self.model.load_model(self.model_name) - logging.info(f"reload model {self.model_name} from disk ...") + self.model.load_model(self.model_path) + logging.info(f"reload model {self.model_path} from disk ...") except OSError: logging.info('no local model to load...') @@ -195,14 +219,15 @@ class ModelSetup(RunEnvironment): """Load all model settings and store in data store.""" model_settings = self.model.get_settings() self.data_store.set_from_dict(model_settings, self.scope, log=True) - self.model_name = self.model_name % self.data_store.get_default("model_name", self.scope, "my_model") - self.data_store.set("model_name", self.model_name, self.scope) + generic_model_name = self.data_store.get_default("model_name", self.scope, "my_model") + self.model_path = self.model_path % generic_model_name + self.data_store.set("model_name", self.model_path, self.scope) def plot_model(self): # pragma: no cover """Plot model architecture as `<model_name>.pdf`.""" try: with tf.device("/cpu:0"): - file_name = f"{self.model_name.rsplit('.', 1)[0]}.pdf" + file_name = f"{self.model_path.rsplit('.', 1)[0]}.pdf" keras.utils.plot_model(self.model, to_file=file_name, show_shapes=True, show_layer_names=True) except Exception as e: logging.info(f"Can not plot model due to: {e}") diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index e4cc34f66db7426876209fece0dc902341ac6582..5bd710d5200564b6f0335e520ecb1d2048df8725 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -27,7 +27,7 @@ from mlair.plotting.postprocessing_plotting import PlotMonthlySummary, PlotClima PlotSeparationOfScales, PlotSampleUncertaintyFromBootstrap, PlotTimeEvolutionMetric, PlotSeasonalMSEStack, \ PlotErrorsOnMap from mlair.plotting.data_insight_plotting import PlotStationMap, PlotAvailability, PlotAvailabilityHistogram, \ - PlotPeriodogram, PlotDataHistogram + PlotPeriodogram, PlotDataHistogram, PlotDataMonthlyDistribution from mlair.run_modules.run_environment import RunEnvironment @@ -89,6 +89,7 @@ class PostProcessing(RunEnvironment): self.window_lead_time = extract_value(self.data_store.get("output_shape", "model")) self.skill_scores = None self.errors = None + self.bias_free_errors = None self.feature_importance_skill_scores = None self.uncertainty_estimate = None self.uncertainty_estimate_seasons = {} @@ -147,7 +148,14 @@ class PostProcessing(RunEnvironment): self.report_error_metrics(errors) self.report_error_metrics({self.forecast_indicator: skill_score_climatological}) self.report_error_metrics({"skill_score": skill_score_competitive}) - self.store_errors(errors) + self.errors = self.store_errors(errors) + + # bias free evaluation + if self.data_store.get("do_bias_free_evaluation", "postprocessing") is True: + bias_free_errors = self.calculate_bias_free_error_metrics() + self.report_error_metrics(bias_free_errors[0], tag="bias_free") + self.report_error_metrics(bias_free_errors[1], tag="seasonal_bias_free") + self.bias_free_errors = [self.store_errors(bias_free_errors[0]), self.store_errors(bias_free_errors[1])] # plotting self.plot() @@ -658,6 +666,29 @@ class PostProcessing(RunEnvironment): logging.error(f"Could not create plot PlotErrorMetrics due to the following error: {e}" f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: + if "PlotErrorMetrics" in plot_list and self.bias_free_errors is not None: + error_metric_units = statistics.get_error_metrics_units("ppb") + error_metrics_name = statistics.get_error_metrics_long_name() + tag = {0: "", 1: "seasonal_"} + for i, errors in enumerate(self.bias_free_errors): + for error_metric in errors.keys(): + try: + PlotSampleUncertaintyFromBootstrap( + data=errors[error_metric], plot_folder=self.plot_path, + model_type_dim=self.model_type_dim, + dim_name_boots="station", error_measure=error_metrics_name[error_metric], + error_unit=error_metric_units[error_metric], model_name=self.model_display_name, + model_indicator=self.model_display_name, sampling=self._sampling, apply_root=False, + plot_name=f"{tag[i]}bias_free_error_plot_{error_metric}") + except Exception as e: + logging.error(f"Could not create plot PlotErrorMetrics for {error_metric} (bias free " + f"{tag[i]}) due to the following error: " + f"{e}\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + except Exception as e: + logging.error(f"Could not create plot PlotErrorMetrics (bias free) due to the following error: {e}" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: if "PlotStationMap" in plot_list: gens = [(self.train_data, {"marker": 5, "ms": 9}), @@ -691,6 +722,16 @@ class PostProcessing(RunEnvironment): logging.error(f"Could not create plot PlotAvailabilityHistogram due to the following error: {e}" f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: + if "PlotDataMonthlyDistribution" in plot_list: + avail_data = {"train": self.train_data, "val": self.val_data, "test": self.test_data} + PlotDataMonthlyDistribution(avail_data, plot_folder=self.plot_path, time_dim=time_dim, + variables_dim=target_dim, window_dim=window_dim, target_var=self.target_var, + ) + except Exception as e: + logging.error(f"Could not create plot PlotDataMonthlyDistribution due to the following error:" + f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") + try: if "PlotDataHistogram" in plot_list: upsampling = self.data_store.get_default("upsampling", scope="train", default=False) @@ -746,10 +787,6 @@ class PostProcessing(RunEnvironment): logging.error(f"Could not create plot PlotErrorsOnMap due to the following error: {e}" f"\n{sys.exc_info()[0]}\n{sys.exc_info()[1]}\n{sys.exc_info()[2]}") - - - - @TimeTrackingWrapper def calculate_test_score(self): """Evaluate test score of model and save locally.""" @@ -1067,6 +1104,46 @@ class PostProcessing(RunEnvironment): except (TypeError, AttributeError): return forecast if competitor is None else competitor + def calculate_bias_free_error_metrics(self): + all_stations = self.data_store.get("stations") + errors = [{}, {}] # errors_total_bias_free, errors_seasonal_bias_free + for station in all_stations: + external_data = self._get_external_data(station, self.forecast_path) # test data + + # test errors + if external_data is not None: + external_data.coords[self.model_type_dim] = [{self.forecast_indicator: self.model_display_name}.get(n, n) + for n in external_data.coords[self.model_type_dim].values] + model_type_list = external_data.coords[self.model_type_dim].values.tolist() + + # load competitors + competitor = self.load_competitors(station) + combined = self._combine_forecasts(external_data, competitor, dim=self.model_type_dim) + if combined is not None: + model_list = remove_items(combined.coords[self.model_type_dim].values.tolist(), + self.observation_indicator) + else: + model_list = None + continue + + # data_total_bias_free, data_seasonal_bias_free + bias_free_data = statistics.calculate_bias_free_data(combined, time_dim=self.index_dim, window_size=30) + + # test errors of competitors + for model_type in (model_list or []): + for data, e in zip(bias_free_data, errors): + if self.observation_indicator not in data.coords[self.model_type_dim]: + continue + if model_type not in e.keys(): + e[model_type] = {} + e[model_type][station] = statistics.calculate_error_metrics( + *map(lambda x: data.sel(**{self.model_type_dim: x}), + [model_type, self.observation_indicator]), dim=self.index_dim) + for e in errors: + for model_type in e.keys(): + e[model_type].update({"total": self.calculate_average_errors(e[model_type])}) + return errors + def calculate_error_metrics(self) -> Tuple[Dict, Dict, Dict, Dict]: """ Calculate error metrics and skill scores of NN forecast. @@ -1177,9 +1254,10 @@ class PostProcessing(RunEnvironment): file_name = "feature_importance_skill_score_report_raw.csv" df.to_csv(os.path.join(report_path, file_name), sep=";") - def report_error_metrics(self, errors): + def report_error_metrics(self, errors, tag=None): report_path = os.path.join(self.data_store.get("experiment_path"), "latex_report") path_config.check_path_and_create(report_path) + base_file_name = "error_report" if tag is None else f"error_report_{tag}" for model_type in errors.keys(): metric_collection = {} for station, station_errors in errors[model_type].items(): @@ -1207,9 +1285,9 @@ class PostProcessing(RunEnvironment): df.reindex(df.index.drop(["total"]).to_list() + ["total"], ) column_format = tables.create_column_format_for_tex(df) if model_type == "skill_score": - file_name = f"error_report_{model_type}_{metric}.%s".replace(' ', '_').replace('/', '_') + file_name = f"{base_file_name}_{model_type}_{metric}.%s".replace(' ', '_').replace('/', '_') else: - file_name = f"error_report_{metric}_{model_type}.%s".replace(' ', '_').replace('/', '_') + file_name = f"{base_file_name}_{metric}_{model_type}.%s".replace(' ', '_').replace('/', '_') tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df) tables.save_to_md(report_path, file_name % "md", df=df) @@ -1226,4 +1304,4 @@ class PostProcessing(RunEnvironment): metric_collection[model_type] = xr.Dataset(station_collection).to_array(station_dim) coll = xr.Dataset(metric_collection).to_array(self.model_type_dim) coll = coll.transpose(station_dim, self.ahead_dim, self.model_type_dim, error_dim) - self.errors = {k: coll.sel({error_dim: k}, drop=True) for k in coll.coords[error_dim].values} + return {k: coll.sel({error_dim: k}, drop=True) for k in coll.coords[error_dim].values} diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index fc1ae4b7ad63a51b623aacb3d846d33ca3a482e0..dffabffe00ddb9d03af08be0cef45df58495d7d4 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -18,7 +18,7 @@ import pandas as pd from mlair.data_handler import DataCollection, AbstractDataHandler from mlair.helpers import TimeTracking, to_list, tables, remove_items from mlair.configuration import path_config -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult from mlair.helpers.testing import check_nested_equality from mlair.run_modules.run_environment import RunEnvironment @@ -64,6 +64,7 @@ class PreProcessing(RunEnvironment): if snapshot_load_path is None: stations = self.data_store.get("stations") data_handler = self.data_store.get("data_handler") + self._load_apriori() _, valid_stations = self.validate_station(data_handler, stations, "preprocessing") # , store_processed_data=False) if len(valid_stations) == 0: @@ -294,11 +295,12 @@ class PreProcessing(RunEnvironment): else: # serial solution logging.info("use serial validate station approach") kwargs.update({"return_strategy": "result"}) - for station in set_stations: + for i, station in enumerate(set_stations): dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) if dh is not None: collection.add(dh) valid_stations.append(s) + logging.info(f"...finished: {s} ({int((i + 1.) / len(set_stations) * 100)}%)") logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/" f"{len(set_stations)} valid stations ({set_name}).") @@ -318,6 +320,30 @@ class PreProcessing(RunEnvironment): attrs[k] = dict(attrs.get(k, {}), **{station: v}) for k, v in attrs.items(): self.data_store.set(k, v) + self._store_apriori() + + def _store_apriori(self): + apriori = self.data_store.get_default("apriori", default=None) + if apriori: + experiment_path = self.data_store.get("experiment_path") + path = os.path.join(experiment_path, "data", "apriori") + store_file = os.path.join(path, "apriori.pickle") + if not os.path.exists(path): + path_config.check_path_and_create(path) + with open(store_file, "wb") as f: + dill.dump(apriori, f, protocol=4) + logging.debug(f"Store apriori options locally for later use at: {store_file}") + + def _load_apriori(self): + if self.data_store.get_default("apriori", default=None) is None: + apriori_file = self.data_store.get_default("apriori_file", None) + if apriori_file is not None: + if os.path.exists(apriori_file): + logging.info(f"use apriori data from given file: {apriori_file}") + with open(apriori_file, "rb") as pickle_file: + self.data_store.set("apriori", dill.load(pickle_file)) + else: + logging.info(f"cannot load apriori file: {apriori_file}. Use fresh calculation from data.") def transformation(self, data_handler: AbstractDataHandler, stations): calculate_fresh_transformation = self.data_store.get_default("calculate_fresh_transformation", True) @@ -378,13 +404,23 @@ class PreProcessing(RunEnvironment): elif competitor_name.lower() == "CAMS".lower(): logging.info("Prepare CAMS forecasts") from mlair.reference_models.reference_model_cams import CAMSforecast + interp_method = self.data_store.get_default("cams_interp_method", default=None) data_path = self.data_store.get_default("cams_data_path", default=None) path = os.path.join(self.data_store.get("competitor_path"), competitor_name) stations = {} for subset in ["train", "val", "test"]: data_collection = self.data_store.get("data_collection", subset) stations.update({str(s): s.get_coordinates() for s in data_collection if s not in stations}) - CAMSforecast("CAMS", ref_store_path=path, data_path=data_path).make_reference_available_locally(stations) + if interp_method is None: + CAMSforecast("CAMS", ref_store_path=path, data_path=data_path, interp_method=None + ).make_reference_available_locally(stations) + else: + competitors = remove_items(competitors, "CAMS") + for method in to_list(interp_method): + CAMSforecast(f"CAMS{method}", ref_store_path=path + method, data_path=data_path, + interp_method=method).make_reference_available_locally(stations) + competitors.append(f"CAMS{method}") + self.data_store.set("competitors", competitors) else: logging.info(f"No preparation required for competitor {competitor_name} as no specific instruction " f"is provided.") @@ -424,8 +460,9 @@ class PreProcessing(RunEnvironment): "model_class", "model_display_name", "model_path", "n_boots", "n_hidden", "n_layer", "neighbors", "plot_list", "plot_path", "regularizer", "restore_best_model_weights", "snapshot_load_path", "snapshot_path", "stations", "tmp_path", "train_model", - "transformation", "use_multiprocessing", ] - + "transformation", "use_multiprocessing", "cams_data_path", "cams_interp_method", + "do_bias_free_evaluation", "apriori_file", "model_path", "model_load_path", "era5_data_path", + "era5_file_names", "ifs_data_path", "ifs_file_names"] data_handler = self.data_store.get("data_handler") model_class = self.data_store.get("model_class") excluded_params = list(set(excluded_params + data_handler.store_attributes() + model_class.requirements())) diff --git a/requirements.txt b/requirements.txt index e6578031a037ce89054db061c2582dea4d91fdca..fd0dfe8d8ca4790a60d45de3b4466992107df2c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ netcdf4==1.6.0 numpy~=1.19.2 pandas==1.3.4 partd==1.2.0 +protobuf==3.20.* psutil==5.9.1 pydot==1.4.2 pytest==6.2.2 diff --git a/test/test_configuration/test_defaults.py b/test/test_configuration/test_defaults.py index b46590290eff09ac98d549c7d38010eb5506d09c..4234b7881d2c28f0a5caef3bf428efe8403412e4 100644 --- a/test/test_configuration/test_defaults.py +++ b/test/test_configuration/test_defaults.py @@ -75,4 +75,6 @@ class TestAllDefaults: assert DEFAULT_PLOT_LIST == ["PlotMonthlySummary", "PlotStationMap", "PlotClimatologicalSkillScore", "PlotTimeSeries", "PlotCompetitiveSkillScore", "PlotFeatureImportanceSkillScore", "PlotConditionalQuantiles", "PlotAvailability", "PlotAvailabilityHistogram", - "PlotDataHistogram", "PlotPeriodogram", "PlotSampleUncertaintyFromBootstrap"] + "PlotDataHistogram", "PlotPeriodogram", "PlotSampleUncertaintyFromBootstrap", + "PlotErrorMetrics", "PlotDataMonthlyDistribution", "PlotTimeEvolutionMetric", + "PlotSeasonalMSEStack", "PlotErrorsOnMap"] diff --git a/test/test_helpers/test_data_sources/test_join.py b/test/test_helpers/test_data_sources/test_join.py index f9b12f5a7ff20e898695de0a0f035bed023674f2..edf69e203c69a1bdd0c99c8b98cebb1ea5680fb1 100644 --- a/test/test_helpers/test_data_sources/test_join.py +++ b/test/test_helpers/test_data_sources/test_join.py @@ -7,7 +7,7 @@ from mlair.helpers.data_sources.join import _save_to_pandas, _lower_list, _selec _select_distinct_data_origin, _select_distinct_network from mlair.configuration.join_settings import join_settings from mlair.helpers.testing import check_nested_equality -from mlair.helpers.data_sources.toar_data import EmptyQueryResult +from mlair.helpers.data_sources.data_loader import EmptyQueryResult class TestDownloadJoin: diff --git a/test/test_helpers/test_data_sources/test_toar_data.py b/test/test_helpers/test_data_sources/test_toar_data.py index abaec10cc580b592d85d7dcc842616c67777f174..31af052ef9cfbe297f24ab167f8b06370a2239b7 100644 --- a/test/test_helpers/test_data_sources/test_toar_data.py +++ b/test/test_helpers/test_data_sources/test_toar_data.py @@ -1,5 +1,5 @@ from mlair.configuration.join_settings import join_settings -from mlair.helpers.data_sources.toar_data import get_data, create_url, correct_stat_name +from mlair.helpers.data_sources.data_loader import get_data, correct_stat_name, create_url class TestGetData: diff --git a/test/test_helpers/test_filter.py b/test/test_helpers/test_filter.py index e4bfb6890936d13137ebb6dda01a44eed0166ae5..5df3b9e48c4d530961d69ebad44331acd487cc9c 100644 --- a/test/test_helpers/test_filter.py +++ b/test/test_helpers/test_filter.py @@ -178,8 +178,8 @@ class TestClimateFIRFilter: obj = object.__new__(ClimateFIRFilter) sel_opts = {time_dim: slice("2010-05", "2010-08")} res = obj.create_monthly_mean(xr_array_long, time_dim, sel_opts=sel_opts) - assert res.dropna(time_dim)[f"{time_dim}.month"].min() == 5 - assert res.dropna(time_dim)[f"{time_dim}.month"].max() == 8 + assert res.dropna(time_dim)[f"{time_dim}.month"].min() == 1 + assert res.dropna(time_dim)[f"{time_dim}.month"].max() == 12 mean_jun_2010 = xr_array_long[xr_array_long[f"{time_dim}.month"] == 6].sel({time_dim: "2010"}).mean() assert res.sel({time_dim: "2010-06-15T00:00:00"}) == mean_jun_2010 @@ -226,8 +226,8 @@ class TestClimateFIRFilter: xr_array_long = xr_array_long.resample({time_dim: "1H"}).interpolate() sel_opts = {time_dim: slice("2010-05", "2010-08")} res = obj.create_seasonal_hourly_mean(xr_array_long, time_dim, sel_opts=sel_opts) - assert res.dropna(time_dim)[f"{time_dim}.month"].min() == 5 - assert res.dropna(time_dim)[f"{time_dim}.month"].max() == 8 + assert res.dropna(time_dim)[f"{time_dim}.month"].min() == 1 + assert res.dropna(time_dim)[f"{time_dim}.month"].max() == 12 def test_create_unity_array(self, xr_array, time_dim): obj = object.__new__(ClimateFIRFilter) @@ -294,13 +294,13 @@ class TestClimateFIRFilter: def test_trim_data_to_minimum_length(self, xr_array, time_dim): obj = object.__new__(ClimateFIRFilter) xr_array = obj._shift_data(xr_array, range(-20, 1), time_dim, new_dim="window") - res = obj._trim_data_to_minimum_length(xr_array, 5, "window") + res = obj._trim_data_to_minimum_length(xr_array, -5, "window") assert xr_array.shape == (21, 100, 1) assert res.shape == (6, 100, 1) - res = obj._trim_data_to_minimum_length(xr_array, 30, "window") + res = obj._trim_data_to_minimum_length(xr_array, -30, "window") assert res.shape == (21, 100, 1) xr_array = obj._shift_data(xr_array.sel(window=0), range(-20, 5), time_dim, new_dim="window") - res = obj._trim_data_to_minimum_length(xr_array, 5, "window", extend_length_future=2) + res = obj._trim_data_to_minimum_length(xr_array, -5, "window", extend_length_future=2) assert res.shape == (8, 100, 1) def test_create_full_filter_result_array(self, xr_array, time_dim): @@ -321,7 +321,7 @@ class TestClimateFIRFilter: assert len(res) == 5 # check filter data properties - assert res[0].shape == (*xr_array_long_with_var.shape, int(filter_order+1)/2 + 24 + 2) + assert res[0].shape == (*xr_array_long_with_var.shape, 24 + 2) assert res[0].dims == (*xr_array_long_with_var.dims, "window") # check filter properties @@ -351,7 +351,7 @@ class TestClimateFIRFilter: var_dim=var_dim, new_dim="total_new_dim", window=("kaiser", 5), minimum_length=1000, apriori=apriori, plot_dates=plot_dates) - assert res[0].shape == (*xr_array_long_with_var.shape, int(10 * 24 + 1 + 1) / 2 + 1000 + 2) + assert res[0].shape == (*xr_array_long_with_var.shape, 1000 + 2) assert res[0].dims == (*xr_array_long_with_var.dims, "total_new_dim") assert np.testing.assert_almost_equal( res[2], obj._calculate_filter_coefficients(("kaiser", 5), filter_order, 0.05, 24)) is None diff --git a/test/test_run_modules/test_model_setup.py b/test/test_run_modules/test_model_setup.py index 295954a7cf53927939d50de5a84d474cb1818026..aefbcb04d8f34ac44081e3dae45b08c9283214cc 100644 --- a/test/test_run_modules/test_model_setup.py +++ b/test/test_run_modules/test_model_setup.py @@ -27,7 +27,7 @@ class TestModelSetup: obj.data_store.set("lr_decay", "dummy_str", "general.model") obj.data_store.set("hist", "dummy_str", "general.model") obj.data_store.set("epochs", 2) - obj.model_name = "%s.h5" + obj.model_path = "%s.h5" yield obj RunEnvironment().__del__() diff --git a/test/test_run_modules/test_pre_processing.py b/test/test_run_modules/test_pre_processing.py index 9debec9f04583b401698cea2f79f78b523fe7927..a0ffb1cb0d2d57471966ad1be83b5d8f53ebeec2 100644 --- a/test/test_run_modules/test_pre_processing.py +++ b/test/test_run_modules/test_pre_processing.py @@ -28,13 +28,14 @@ class TestPreProcessing: @pytest.fixture def obj_with_exp_setup(self): - ExperimentSetup(stations=['DEBW107', 'DEBW013', 'DEBW087', 'DEBW99X'], - statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}, station_type="background", - data_origin={'o3': 'UBA', 'temp': 'UBA'}, data_handler=DefaultDataHandler) - pre = object.__new__(PreProcessing) - super(PreProcessing, pre).__init__() - yield pre - RunEnvironment().__del__() + with RunEnvironment(): + ExperimentSetup(stations=['DEBW107', 'DEBW013', 'DEBW087', 'DEBW99X'], + statistics_per_var={'o3': 'dma8eu', 'temp': 'maximum'}, station_type="background", + data_origin={'o3': 'UBA', 'temp': 'UBA'}, data_handler=DefaultDataHandler) + pre = object.__new__(PreProcessing) + super(PreProcessing, pre).__init__() + yield pre + # RunEnvironment().__del__() def test_init(self, caplog): ExperimentSetup(stations=['DEBW107', 'DEBW013', 'DEBW087'],