diff --git a/mlair/configuration/ifs_settings.py b/mlair/configuration/ifs_settings.py new file mode 100644 index 0000000000000000000000000000000000000000..f0e8ac49093f2aac30c0a6318d32977ed06506e2 --- /dev/null +++ b/mlair/configuration/ifs_settings.py @@ -0,0 +1,19 @@ +"""Settings to access not public era5 data.""" + +from typing import Tuple + + +def ifs_settings(sampling="daily") -> Tuple[str, str]: + """ + Check for sampling as only hourly resolution is supported by ifs and return path on HPC systems. + + :param sampling: temporal resolution to load data for, only hourly supported (default "daily") + + :return: HPC path + """ + if sampling == "hourly": # pragma: no branch + IFS_DATA_PATH = "." + FILE_NAMES = "*.nc" + else: + raise NameError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") + return IFS_DATA_PATH, FILE_NAMES diff --git a/mlair/helpers/data_sources/era5.py b/mlair/helpers/data_sources/era5.py index 4a26fa2a2c8450cc0a20fd59ee3aa518f38e7ec2..3e81a460ba58c0d3cfb94412235de608fc43f730 100644 --- a/mlair/helpers/data_sources/era5.py +++ b/mlair/helpers/data_sources/era5.py @@ -53,7 +53,7 @@ def load_era5(station_name, stat_var, sampling, data_origin): # check if all requested variables are available if set(stat_var).issubset(station_data.columns) is False: - missing_variables = set(stat_var).difference(stat_var) + missing_variables = set(stat_var).difference(station_data.columns) origin = helpers.select_from_dict(data_origin, missing_variables) options = f"station={station_name}, origin={origin}" raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") diff --git a/mlair/helpers/data_sources/ifs.py b/mlair/helpers/data_sources/ifs.py new file mode 100644 index 0000000000000000000000000000000000000000..e9e75bd4e283c943545a09a04bf47feb1eb02a5a --- /dev/null +++ b/mlair/helpers/data_sources/ifs.py @@ -0,0 +1,119 @@ +"""Methods to load ifs data.""" +__author__ = "Lukas Leufen, Michael Langgut" +__date__ = "2023-06-07" + +import logging +import os +import re +import glob +from functools import partial + +import numpy as np +import pandas as pd +import xarray as xr + +from mlair import helpers +from mlair.configuration.ifs_settings import ifs_settings +from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings +from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone +from mlair.helpers.data_sources.data_loader import EmptyQueryResult +from mlair.helpers.meteo import relative_humidity_from_dewpoint + + +def load_ifs(station_name, stat_var, sampling, data_origin): + + # make sure station_name parameter is a list + station_name = helpers.to_list(station_name) + + # get data path + data_path, file_names = ifs_settings(sampling) + + # correct stat_var values if data is not aggregated (hourly) + if sampling == "hourly": + stat_var = {key: "values" for key in stat_var.keys()} + else: + raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") + + # load station meta using toar-data v2 API + meta_url_base, headers = toar_data_v2_settings("meta") + station_meta = load_station_information(station_name, meta_url_base, headers) + + # sel data for station using sel method nearest + logging.info(f"load data for {station_meta['codes'][0]} from IFS") + try: + lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] + file_names = sort_ifs_files(data_path) + with xr.open_mfdataset(file_names, preprocess=partial(preprocess_ifs_single_file, lon, lat), + concat_dim="initial_time", combine="nested") as data: + station_data = data.to_array().T.compute() + except OSError as e: + logging.info(f"Cannot load ifs data from path {data_path} and filenames {file_names} due to: {e}") + return None, None + + if "relhum" in stat_var: + relhum = relative_humidity_from_dewpoint(station_data.sel(variable="d2m"), station_data.sel(variable="t2m")) + station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["rhw"]})], dim="variable") + station_data.coords["variable"] = _rename_ifs_variables(station_data.coords["variable"].values) + + # check if all requested variables are available + if set(stat_var).issubset(station_data.coords["variable"].values) is False: + missing_variables = set(stat_var).difference(station_data.coords["variable"].values) + origin = helpers.select_from_dict(data_origin, missing_variables) + options = f"station={station_name}, origin={origin}" + raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") + else: + station_data = station_data.sel(variable=list(stat_var.keys())) + + # convert to local timezone + station_data.coords["initial_time"] = correct_timezone(station_data.sel(lead_time=1).to_pandas(), station_meta, + sampling).index + + variable_meta = _emulate_meta_data(station_data) + meta = combine_meta_data(station_meta, variable_meta) + meta = pd.DataFrame.from_dict(meta, orient='index') + meta.columns = station_name + return station_data, meta + + +def sort_ifs_files(data_path, pattern="sfc_*.nc"): + def sort_by_date(file_name): + match = re.search(r'(\d{8})_(\d{2})', file_name) + if match: + return match.group(1), match.group(2) + file_names = glob.glob(os.path.join(data_path, pattern)) + return sorted(file_names, key=sort_by_date) + + +def preprocess_ifs_single_file(lon, lat, ds): + """Select lon and lat from data file and transform valid time into lead time.""" + ds = ds.sel(longitude=lon, latitude=lat, method="nearest", drop=True) + return expand_dims_initial_time(ds) + + +def expand_dims_initial_time(ds): + """Create lead time from initial time and valid time.""" + initial_time = ds.time[0] + lead_time = (ds.time - initial_time) / np.timedelta64(1, "h") + # ds = ds.expand_dims(dim={"initial_time": [initial_time.values], "lead_time": lead_time}, axis=(0, 1)) + ds.coords["time"] = lead_time + ds = ds.rename({"time": "lead_time"}) + ds = ds.expand_dims(dim={"initial_time": [initial_time.values]}, axis=0) + return ds + + +def _emulate_meta_data(station_data): + general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} + roles_meta = {"roles": [{"contact": {"organisation": {"name": "IFS", "longname": "ECMWF"}}}]} + variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.coords["variable"].values} + return variable_meta + + +def _rename_ifs_variables(ifs_names): + mapper = {"sp": "press", "u10": "u", "v10": "v", "t2m": "temp", "d2m": "dew", "blh": "pblheight", + "tcc": "cloudcover", "rhw": "relhum"} + ifs_names = list(ifs_names) + try: + join_names = list(map(lambda x: mapper.get(x, x), ifs_names)) + return join_names + except KeyError as e: + raise KeyError(f"Cannot map names from ifs to join naming convention: {e}")