Commit 16851339 authored by lukas leufen's avatar lukas leufen 👻
Browse files

data handlers can now use era5 data, refactoring still required

parent af151541
Pipeline #103018 passed with stages
in 11 minutes and 58 seconds
......@@ -20,7 +20,7 @@ import xarray as xr
from mlair.configuration import check_path_and_create
from mlair import helpers
from mlair.helpers import join, statistics, TimeTrackingWrapper
from mlair.helpers import join, statistics, TimeTrackingWrapper, filter_dict_by_value, select_from_dict, era5
from mlair.data_handler.abstract_data_handler import AbstractDataHandler
# define a more general date type for type hinting
......@@ -340,34 +340,37 @@ class DataHandlerSingleStation(AbstractDataHandler):
os.remove(meta_file)
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin)
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
logging.debug(f"loaded new data")
else:
try:
logging.debug(f"try to load local data from: {file_name}")
data = xr.open_dataarray(file_name)
meta = pd.read_csv(meta_file, index_col=0)
self.check_station_meta(meta, station, station_type, network)
self.check_station_meta(meta, station, station_type, network, data_origin)
logging.debug("loading finished")
except FileNotFoundError as e:
logging.debug(e)
logging.debug(f"load new data")
data, meta = self.download_data(file_name, meta_file, station, statistics_per_var, sampling,
station_type=station_type, network=network,
store_data_locally=store_data_locally, data_origin=data_origin)
store_data_locally=store_data_locally, data_origin=data_origin,
time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
logging.debug("loading finished")
# create slices and check for negative concentration.
data = self._slice_prep(data, start=start, end=end)
data = self.check_for_negative_concentrations(data)
return data, meta
@staticmethod
def download_data_from_join(file_name: str, meta_file: str, station, statistics_per_var, sampling,
def download_data(self, file_name: str, meta_file: str, station, statistics_per_var, sampling,
station_type=None, network=None, store_data_locally=True, data_origin: Dict = None,
time_dim=DEFAULT_TIME_DIM, target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM) \
-> [xr.DataArray, pd.DataFrame]:
"""
Download data from TOAR database using the JOIN interface.
Download data from TOAR database using the JOIN interface or load local era5 data.
Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally
stored locally using given names for file and meta file.
......@@ -378,8 +381,32 @@ class DataHandlerSingleStation(AbstractDataHandler):
:return: downloaded data and its meta data
"""
df_all = {}
df, meta = join.download_join(station_name=station, stat_var=statistics_per_var, station_type=station_type,
network_name=network, sampling=sampling, data_origin=data_origin)
df_era5, df_join = None, None
meta_era5, meta_join = None, None
if data_origin is not None:
era5_origin = filter_dict_by_value(data_origin, "era5", True)
era5_stats = select_from_dict(statistics_per_var, era5_origin.keys())
join_origin = filter_dict_by_value(data_origin, "era5", False)
join_stats = select_from_dict(statistics_per_var, era5_origin.keys(), filter_cond=False)
assert len(era5_origin) + len(join_origin) == len(data_origin)
assert len(era5_stats) + len(join_stats) == len(statistics_per_var)
else:
era5_origin, join_origin = None, None
era5_stats, join_stats = statistics_per_var, statistics_per_var
# load data
if era5_origin is not None and len(era5_origin) > 0:
# load era5 data
df_era5, meta_era5 = era5.load_era5(station_name=station, stat_var=era5_stats, sampling=sampling,
data_origin=era5_origin)
if join_origin is None or len(join_stats.keys()) > 0:
# load join data
df_join, meta_join = join.download_join(station_name=station, stat_var=join_stats, station_type=station_type,
network_name=network, sampling=sampling, data_origin=join_origin)
df = pd.concat([df_era5, df_join], axis=1, sort=True)
meta = meta_era5 if meta_era5 is not None else meta_join
meta.loc["data_origin"] = str(data_origin)
df_all[station[0]] = df
# convert df_all to xarray
xarr = {k: xr.DataArray(v, dims=[time_dim, target_dim]) for k, v in df_all.items()}
......@@ -390,22 +417,17 @@ class DataHandlerSingleStation(AbstractDataHandler):
meta.to_csv(meta_file)
return xarr, meta
def download_data(self, *args, **kwargs):
data, meta = self.download_data_from_join(*args, **kwargs, time_dim=self.time_dim, target_dim=self.target_dim,
iter_dim=self.iter_dim)
return data, meta
@staticmethod
def check_station_meta(meta, station, station_type, network):
def check_station_meta(meta, station, station_type, network, data_origin):
"""
Search for the entries in meta data and compare the value with the requested values.
Will raise a FileNotFoundError if the values mismatch.
"""
if station_type is not None:
check_dict = {"station_type": station_type, "network_name": network}
check_dict = {"station_type": station_type, "network_name": network, "data_origin": str(data_origin)}
for (k, v) in check_dict.items():
if v is None:
if v is None or k not in meta:
continue
if meta.at[k, station[0]] != v:
logging.debug(f"meta data does not agree with given request for {k}: {v} (requested) != "
......
......@@ -31,9 +31,8 @@ def load_era5(station_name, stat_var, sampling, data_origin):
raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
# get data connection settings
join_url_base, headers = join_settings()
# load series information (lat/lon) from join database
join_url_base, headers = join_settings()
meta = load_meta_data(station_name, None, None, join_url_base, headers)
# sel data for station using sel method nearest
......
......@@ -44,7 +44,10 @@ def download_join(station_name: Union[str, List[str]], stat_var: dict, station_t
station_name = helpers.to_list(station_name)
# also ensure that given data_origin dict is no reference
data_origin = None if data_origin is None else {k: v for (k, v) in data_origin.items()}
if data_origin is None or len(data_origin) == 0:
data_origin = None
else:
data_origin = {k: v for (k, v) in data_origin.items()}
# get data connection settings
join_url_base, headers = join_settings(sampling)
......@@ -153,6 +156,24 @@ def retries_session(max_retries=3):
return http
def load_meta_data(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
join_url_base: str, headers: Dict) -> [Dict, Dict]:
opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type,
"network_name": network_name, "as_dict": "true",
"columns": "station_id,network_name,station_local_id,station_type,station_type_of_area,station_category,"
"station_name,station_country,station_state,station_lon,station_lat,station_alt,"
"station_timezone,station_nightlight_5km,station_climatic_zone,station_wheat_production,"
"station_rice_production,station_nox_emissions,station_omi_no2_column,station_toar_category,"
"station_htap_region,station_reported_alt,station_alt_flag,station_coordinate_status,"
"station_google_alt,station_etopo_alt,station_etopo_min_alt_5km,station_etopo_relative_alt,"
"station_dominant_landcover,station_landcover_description,station_max_nightlight_25km,"
"station_max_population_density_25km,station_nightlight_1km,station_population_density,"
"google_resolution,station_comments,station_max_population_density_5km"}
if network_name is None:
opts["columns"] = opts["columns"].replace(",network_name", "")
return get_data(opts, headers)[-1]
def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
join_url_base: str, headers: Dict, data_origin: Dict = None) -> [Dict, Dict]:
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment