Commit af151541 authored by lukas leufen's avatar lukas leufen 👻
Browse files

first method collection to load era5 data

parent ff6db061
Pipeline #103011 passed with stages
in 12 minutes and 13 seconds
"""Settings to access not public era5 data."""
from typing import Tuple
def era5_settings(sampling="daily") -> Tuple[str, str]:
"""
Check for sampling as only hourly resolution is supported by era5 and return path on HPC systems.
:param sampling: temporal resolution to load data for, only hourly supported (default "daily")
:return: HPC path
"""
if sampling == "hourly": # pragma: no branch
ERA5_DATA_PATH = "."
FILE_NAMES = "*.nc"
else:
raise NameError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
return ERA5_DATA_PATH, FILE_NAMES
"""Methods to load era5 data."""
__author__ = "Lukas Leufen"
__date__ = "2022-06-09"
import logging
import os
import numpy as np
import pandas as pd
import xarray as xr
from mlair import helpers
from mlair.configuration.era5_settings import era5_settings
from mlair.configuration.join_settings import join_settings
from mlair.helpers.join import load_meta_data, EmptyQueryResult
from mlair.helpers.meteo import relative_humidity_from_dewpoint
def load_era5(station_name, stat_var, sampling, data_origin):
# make sure station_name parameter is a list
station_name = helpers.to_list(station_name)
# get data path
data_path, file_names = era5_settings(sampling)
# correct stat_var values if data is not aggregated (hourly)
if sampling == "hourly":
stat_var = {key: "values" for key in stat_var.keys()}
else:
raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
# get data connection settings
join_url_base, headers = join_settings()
# load series information (lat/lon) from join database
meta = load_meta_data(station_name, None, None, join_url_base, headers)
# sel data for station using sel method nearest
data = xr.open_mfdataset(os.path.join(data_path, file_names))
station_dask = data.sel(lon=meta["station_lon"], lat=meta["station_lat"], method="nearest", drop=True)
station_data = station_dask.to_array().T.compute()
# transform data and meta to pandas
station_data = station_data.to_pandas()
if "relhum" in stat_var:
station_data["RHw"] = relative_humidity_from_dewpoint(station_data["D2M"], station_data["T2M"])
station_data.columns = _rename_era5_variables(station_data.columns)
# check if all requested variables are available
if set(stat_var).issubset(station_data.columns) is False:
missing_variables = set(stat_var).difference(stat_var)
origin = helpers.select_from_dict(data_origin, missing_variables)
options = f"station={station_name}, origin={origin}"
raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.")
else:
station_data = station_data[stat_var]
meta = pd.DataFrame.from_dict(meta, orient="index", columns=station_name)
return station_data, meta
def _rename_era5_variables(era5_names):
mapper = {"SP": "press", "U10M": "u", "V10M": "v", "T2M": "temp", "D2M": "dew", "BLH": "pblheight",
"TCC": "cloudcover", "RHw": "relhum"}
era5_names = list(era5_names)
try:
join_names = list(map(lambda x: mapper[x], era5_names))
return join_names
except KeyError as e:
raise KeyError(f"Cannot map names from era5 to join naming convention: {e}")
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment