Commit cad70f04 authored by lukas leufen's avatar lukas leufen 👻
Browse files

add new competitor CAMS

parent 8d3f92f1
Pipeline #104237 passed with stages
in 12 minutes and 1 second
......@@ -17,13 +17,14 @@ class AbstractReferenceModel(ABC):
def __init__(self, *args, **kwargs):
pass
def make_reference_available_locally(self):
def make_reference_available_locally(self, *args):
raise NotImplementedError
@staticmethod
def is_reference_available_locally(reference_path) -> bool:
"""
Checks if reference is available locally
:param reference_path: look in this path for data
"""
try:
......
__author__ = "Lukas Leufen"
__date__ = "2022-06-27"
from mlair.configuration.path_config import check_path_and_create
from mlair.reference_models.abstract_reference_model import AbstractReferenceModel
import os
import xarray as xr
import pandas as pd
class CAMSforecast(AbstractReferenceModel):
def __init__(self, ref_name: str, ref_store_path: str = None, data_path: str = None):
super().__init__()
self.ref_name = ref_name
if ref_store_path is None:
ref_store_path = f"{self.ref_name}/"
self.ref_store_path = ref_store_path
if data_path is None:
self.data_path = os.path.abspath(".")
else:
self.data_path = os.path.abspath(data_path)
self.file_pattern = "forecasts_%s_test.nc"
self.time_dim = "index"
self.ahead_dim = "ahead"
self.type_dim = "type"
def make_reference_available_locally(self, stations):
"dma8eu_ENS_FORECAST_2019-04-09.nc"
missing_stations = self.list_locally_available_references(self.ref_store_path, stations)
if len(missing_stations) > 0:
check_path_and_create(self.ref_store_path)
dataset = xr.open_mfdataset(os.path.join(self.data_path, "dma8eu_ENS_FORECAST_*.nc"))
darray = dataset.to_array().sortby(["longitude", "latitude"])
for station, coords in missing_stations.items():
lon, lat = coords["lon"], coords["lat"]
station_data = darray.sel(longitude=lon, latitude=lat, method="nearest", drop=True).squeeze(drop=True)
station_data = station_data.expand_dims(dim={self.type_dim: [self.ref_name]}).compute()
station_data.coords[self.time_dim] = station_data.coords[self.time_dim] - pd.Timedelta(days=1)
station_data.coords[self.ahead_dim] = station_data.coords[self.ahead_dim] + 1
file_name = self.file_pattern % str(station)
station_data.to_netcdf(os.path.join(self.ref_store_path, file_name))
@staticmethod
def list_locally_available_references(reference_path, stations) -> dict:
try:
file_list = os.listdir(reference_path)
if len(file_list) > 0:
res = {k: v for k, v in stations.items() if all(k not in x for x in file_list)}
else:
res = stations
except FileNotFoundError:
res = stations
return res
......@@ -770,8 +770,11 @@ class PostProcessing(RunEnvironment):
file = os.path.join(path, f"forecasts_{station_name}_test.nc")
with xr.open_dataarray(file) as da:
data = da.load()
forecast = data.sel(type=[self.forecast_indicator])
forecast.coords[self.model_type_dim] = [competitor_name]
if self.forecast_indicator in data.coords[self.model_type_dim]:
forecast = data.sel({self.model_type_dim: [self.forecast_indicator]})
forecast.coords[self.model_type_dim] = [competitor_name]
else:
forecast = data.sel({self.model_type_dim: [competitor_name]})
return forecast
def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray:
......
......@@ -369,7 +369,19 @@ class PreProcessing(RunEnvironment):
logging.info("Prepare IntelliO3-ts-v1 model")
from mlair.reference_models.reference_model_intellio3_v1 import IntelliO3_ts_v1
path = os.path.join(self.data_store.get("competitor_path"), competitor_name)
IntelliO3_ts_v1("IntelliO3-ts-v1", path).make_reference_available_locally(remove_tmp_dir=False)
IntelliO3_ts_v1("IntelliO3-ts-v1", ref_store_path=path).make_reference_available_locally(remove_tmp_dir=False)
elif competitor_name.lower() == "CAMS".lower():
logging.info("Prepare CAMS forecasts")
from mlair.reference_models.reference_model_cams import CAMSforecast
data_path = self.data_store.get_default("cams_data_path", default=None)
path = os.path.join(self.data_store.get("competitor_path"), competitor_name)
stations = {}
for subset in ["train", "val", "test"]:
data_collection = self.data_store.get("data_collection", subset)
stations.update({str(s): s.get_coordinates() for s in data_collection if s not in stations})
CAMSforecast("CAMS", ref_store_path=path, data_path=data_path).make_reference_available_locally(stations)
else:
logging.info("No preparation required because no competitor was provided to the workflow.")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment