Skip to content
Snippets Groups Projects
Commit 61cf2d17 authored by leufen1's avatar leufen1
Browse files

ifs loader works, but returns data with dim initial_time and lead_time

parent fd584ec4
No related branches found
No related tags found
3 merge requests!522filter can now combine obs, forecast, and apriori for first iteration. Further...,!521Resolve "release v2.4.0",!517Resolve "load ifs data"
Pipeline #141716 failed
"""Settings to access not public era5 data."""
from typing import Tuple
def ifs_settings(sampling="daily") -> Tuple[str, str]:
"""
Check for sampling as only hourly resolution is supported by ifs and return path on HPC systems.
:param sampling: temporal resolution to load data for, only hourly supported (default "daily")
:return: HPC path
"""
if sampling == "hourly": # pragma: no branch
IFS_DATA_PATH = "."
FILE_NAMES = "*.nc"
else:
raise NameError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
return IFS_DATA_PATH, FILE_NAMES
......@@ -53,7 +53,7 @@ def load_era5(station_name, stat_var, sampling, data_origin):
# check if all requested variables are available
if set(stat_var).issubset(station_data.columns) is False:
missing_variables = set(stat_var).difference(stat_var)
missing_variables = set(stat_var).difference(station_data.columns)
origin = helpers.select_from_dict(data_origin, missing_variables)
options = f"station={station_name}, origin={origin}"
raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.")
......
"""Methods to load ifs data."""
__author__ = "Lukas Leufen, Michael Langgut"
__date__ = "2023-06-07"
import logging
import os
import re
import glob
from functools import partial
import numpy as np
import pandas as pd
import xarray as xr
from mlair import helpers
from mlair.configuration.ifs_settings import ifs_settings
from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone
from mlair.helpers.data_sources.data_loader import EmptyQueryResult
from mlair.helpers.meteo import relative_humidity_from_dewpoint
def load_ifs(station_name, stat_var, sampling, data_origin):
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# get data path
data_path, file_names = ifs_settings(sampling)
# correct stat_var values if data is not aggregated (hourly)
if sampling == "hourly":
stat_var = {key: "values" for key in stat_var.keys()}
else:
raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
# load station meta using toar-data v2 API
meta_url_base, headers = toar_data_v2_settings("meta")
station_meta = load_station_information(station_name, meta_url_base, headers)
# sel data for station using sel method nearest
logging.info(f"load data for {station_meta['codes'][0]} from IFS")
try:
lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"]
file_names = sort_ifs_files(data_path)
with xr.open_mfdataset(file_names, preprocess=partial(preprocess_ifs_single_file, lon, lat),
concat_dim="initial_time", combine="nested") as data:
station_data = data.to_array().T.compute()
except OSError as e:
logging.info(f"Cannot load ifs data from path {data_path} and filenames {file_names} due to: {e}")
return None, None
if "relhum" in stat_var:
relhum = relative_humidity_from_dewpoint(station_data.sel(variable="d2m"), station_data.sel(variable="t2m"))
station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["rhw"]})], dim="variable")
station_data.coords["variable"] = _rename_ifs_variables(station_data.coords["variable"].values)
# check if all requested variables are available
if set(stat_var).issubset(station_data.coords["variable"].values) is False:
missing_variables = set(stat_var).difference(station_data.coords["variable"].values)
origin = helpers.select_from_dict(data_origin, missing_variables)
options = f"station={station_name}, origin={origin}"
raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.")
else:
station_data = station_data.sel(variable=list(stat_var.keys()))
# convert to local timezone
station_data.coords["initial_time"] = correct_timezone(station_data.sel(lead_time=1).to_pandas(), station_meta,
sampling).index
variable_meta = _emulate_meta_data(station_data)
meta = combine_meta_data(station_meta, variable_meta)
meta = pd.DataFrame.from_dict(meta, orient='index')
meta.columns = station_name
return station_data, meta
def sort_ifs_files(data_path, pattern="sfc_*.nc"):
def sort_by_date(file_name):
match = re.search(r'(\d{8})_(\d{2})', file_name)
if match:
return match.group(1), match.group(2)
file_names = glob.glob(os.path.join(data_path, pattern))
return sorted(file_names, key=sort_by_date)
def preprocess_ifs_single_file(lon, lat, ds):
"""Select lon and lat from data file and transform valid time into lead time."""
ds = ds.sel(longitude=lon, latitude=lat, method="nearest", drop=True)
return expand_dims_initial_time(ds)
def expand_dims_initial_time(ds):
"""Create lead time from initial time and valid time."""
initial_time = ds.time[0]
lead_time = (ds.time - initial_time) / np.timedelta64(1, "h")
# ds = ds.expand_dims(dim={"initial_time": [initial_time.values], "lead_time": lead_time}, axis=(0, 1))
ds.coords["time"] = lead_time
ds = ds.rename({"time": "lead_time"})
ds = ds.expand_dims(dim={"initial_time": [initial_time.values]}, axis=0)
return ds
def _emulate_meta_data(station_data):
general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"}
roles_meta = {"roles": [{"contact": {"organisation": {"name": "IFS", "longname": "ECMWF"}}}]}
variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in station_data.coords["variable"].values}
return variable_meta
def _rename_ifs_variables(ifs_names):
mapper = {"sp": "press", "u10": "u", "v10": "v", "t2m": "temp", "d2m": "dew", "blh": "pblheight",
"tcc": "cloudcover", "rhw": "relhum"}
ifs_names = list(ifs_names)
try:
join_names = list(map(lambda x: mapper.get(x, x), ifs_names))
return join_names
except KeyError as e:
raise KeyError(f"Cannot map names from ifs to join naming convention: {e}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment