In [None]:
import os, sys
sys.path.append("../utils/")
import xarray as xr
import numpy as np
import pandas as pd

from statistical_evaluation import perform_block_bootstrap_metric, avg_metrics, calculate_cond_quantiles, Scores

# Evaluation of the ERA5 short-range forecasts

Define the path to the file and load the data.

In [None]:
indir = "/p/home/jusers/langguth1/juwels/video_prediction_shared_folder/results/era5-Y2007-2019M01to12-92x56-3840N0000E-2t_tcc_t_850/era5_forecast"
yr=2019

era5_fcst_file = os.path.join(indir, "{0}_era5_short_range_fcst.nc".format(yr))

if not os.path.isfile(era5_fcst_file):
    raise FileNotFoundError("Could not find file with all ERA5 forecasts '{0}'".format(era5_fcst_file))

In [None]:
era5_fcst = xr.open_dataset(era5_fcst_file)
print(era5_fcst)

Next we initialize the function for calculating the MSE and call it to evaluate the ERA5 and persistence forecasts. <br>
If you require further evaluation metrics, just expand the cell accordingly, e.g. add the following lines <br>
```
ssim_func = Scores("ssim", ["lat", "lon"]).score_func 

ssim_era5_all = ssim_func(data_fcst=era5_fcst[varname_fcst], data_ref=era5_fcst[varname_ref])
ssim_per_all = (data_fcst=era5_fcst[varname_per], data_ref=era5_fcst[varname_ref])
```
in case you want to evaluate the SSIM as well.

In [None]:
fdata_clim = os.path.join("/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/T2monthly/", "climatology_t2m_1991-2020.nc")

data = xr.open_dataset(fdata_clim)

# copied from load_climdata in main_visualize_postprocess.py
var="var167"
dt_clim = data[var]

lon_dom = np.arange(0., 27.5, 0.3)
lat_dom = np.arange(54.9, 38.1, -0.3)

# get the coordinates of the data after running CDO
coords = dt_clim.coords
nlat, nlon = len(coords["lat"]), len(coords["lon"])
# modify it our needs
coords_new = dict(coords)
coords_new.pop("time")
coords_new["month"] = np.arange(1, 13)
coords_new["hour"] = np.arange(0, 24)
# initialize a new data array with explicit dimensions for month and hour
data_clim_new = xr.DataArray(
    np.full((12, 24, nlat, nlon), np.nan),
    coords=coords_new,
    dims=["month", "hour", "lat", "lon"],
)
# do the reorganization
for month in np.arange(1, 13):
    data_clim_new.loc[dict(month=month)] = dt_clim.sel(
        time=dt_clim["time.month"] == month
    )

data_clim = data_clim_new.sel(lon=lons, lat=lats, tolerance=0.01, method="nearest")
print(data_clim["lat"])

In [None]:
from tqdm import tqdm
from skimage.metrics import structural_similarity as ssim

# overwrite some score-functions which inherently expect a separate fcst_hour-dimension next to batch_size 

def calc_acc_batch(data_fcst, data_ref,  **kwargs):
    """
    Calculate acc ealuation metric of forecast data w.r.t reference data
    :param data_fcst: forecasted data (xarray with dimensions [batch, fore_hours, lat, lon])
    :param data_ref: reference data (xarray with dimensions [batch, fore_hours, lat, lon])
    :param data_clim: climatology data (xarray with dimensions [monthly, hourly, lat, lon])
    :return: averaged acc for each batch example [batch, fore_hours]
    """
    
    print("Start calculating ACC")
    if "data_clim" in kwargs:
        data_clim = kwargs["data_clim"]
    else:
        raise KeyError("%{0}: climatological data must be parsed to calculate the ACC.".format(method))        

    batch_size = data_fcst.shape[0]
    acc = np.ones([batch_size])*np.nan
    for i in tqdm(range(batch_size)):
        img_fcst = data_fcst[i, ...]
        img_ref = data_ref[i, ...]
        # get the forecast time
        img_month = img_fcst["fcst_hour"].dt.month.values
        img_hour = img_fcst["fcst_hour"].dt.hour.values
        img_clim = data_clim.sel(month=img_month, hour=img_hour)               

        img1_ = img_ref - img_clim
        img2_ = img_fcst - img_clim
        cor1 = np.sum(img1_*img2_)
        cor2 = np.sqrt(np.sum(img1_**2)*np.sum(img2_**2))
        acc[i] = cor1/cor2
        
    # convert to data array 
    acc = xr.DataArray(acc, coords={"fcst_hour": data_fcst["fcst_hour"]}, dims=["fcst_hour"])
    return acc

def calc_ssim_batch(data_fcst, data_ref, **kwargs):
    """
    Calculate ssim ealuation metric of forecast data w.r.t reference data
    :param data_fcst: forecasted data (xarray with dimensions [batch, fore_hours, lat, lon])
    :param data_ref: reference data (xarray with dimensions [batch, fore_hours, lat, lon])
    :return: averaged ssim for each batch example, shape is [batch,fore_hours]
    """
    method = Scores.calc_ssim_batch.__name__
    batch_size = np.array(data_ref).shape[0]
    ssim_pred = []
    for i in tqdm(range(batch_size)):
        ssim_pred.append(ssim(data_ref[i, ...],data_fcst[i,...]))
        
    # convert to data array 
    ssim_pred = xr.DataArray(np.asarray(ssim_pred), coords={"fcst_hour": data_fcst["fcst_hour"]}, dims=["fcst_hour"])    
    return ssim_pred

In [None]:
# to get and configure the score-functions
score_dims = ["lat", "lon"]
scores = ["mse", "ssim", "acc", "texture"]
# the reference data
varname_ref, varname_fcst, varname_per = "2t_ref", "2t_era5_fcst", "2t_persistence_fcst"

# initialize empty dictionaries to store the score functions and to save the corresponding results
score_data = {}
for score in scores:
    # overwrite functions that are incompatble here
    if score == "acc":
        score_func = calc_acc_batch
    elif score == "ssim":
        score_func = calc_ssim_batch
    else:
        score_func = Scores(score, score_dims).score_func
    # parsing the persistence forecast as float32 increases throughput by a factor of 10!
    score_data[score] = {"era5_all": score_func(data_fcst=era5_fcst[varname_fcst], data_ref=era5_fcst[varname_ref], data_clim=data_clim,),
                         "per_all": score_func(data_fcst=era5_fcst[varname_per].astype("float32"), data_ref=era5_fcst[varname_ref], data_clim=data_clim)}   

In [None]:
print(score_data["mse"]["era5_all"])
print(score_data["ssim"]["era5_all"])
print(score_data["texture"]["era5_all"])

Next, we initialize the data arrays to store the metrics for each forecast hour. <br>
Note that the ERA5 short-range forecasts only start twice a day at 06 and 18 UTC, respectively. Besides, the have only data starting from lead time 6 hours, but for consistency with the video prediction models, the data arrays cover all lead times between forecast hour 1 and 12. The unavailable values will be set to None.

In [None]:
init_times = [6, 18]
fcst_hours = np.arange(1, 13)
nhours = len(fcst_hours)
nboots=1000

# MSE
mse_era5_fcst = xr.DataArray(np.empty(nhours, dtype=object), coords={"fcst_hour": fcst_hours}, dims=["fcst_hour"])
mse_era5_fcst_boot = xr.DataArray(np.empty((nhours, nboots), dtype=object),
                                  coords={"fcst_hour": fcst_hours, "iboot": np.arange(nboots)},
                                  dims=["fcst_hour", "iboot"])

mse_per_fcst = xr.DataArray(np.empty(nhours, dtype=object), coords={"fcst_hour": fcst_hours}, dims=["fcst_hour"])
mse_per_fcst_boot = xr.DataArray(np.empty((nhours, nboots), dtype=object),
                                 coords={"fcst_hour": fcst_hours, "iboot": np.arange(nboots)},
                                 dims=["fcst_hour", "iboot"])

# SSMI
ssim_era5_fcst, ssim_per_fcst = mse_era5_fcst.copy(), mse_per_fcst.copy()
ssim_era5_fcst_boot, ssim_per_fcst_boot = mse_per_fcst_boot.copy(), mse_per_fcst_boot.copy()

# ACC
acc_era5_fcst, acc_per_fcst = mse_era5_fcst.copy(), mse_per_fcst.copy()
acc_era5_fcst_boot, acc_per_fcst_boot = mse_per_fcst_boot.copy(), mse_per_fcst_boot.copy()

# ACC
txtr_era5_fcst, txtr_per_fcst = mse_era5_fcst.copy(), mse_per_fcst.copy()
txtr_era5_fcst_boot, txtr_per_fcst_boot = mse_per_fcst_boot.copy(), mse_per_fcst_boot.copy()


Finally, we populate the initialized data arrays by looping over the forecast hours for which data is available. <br>
Additionally, we perform block bootstrapping to estimate the uncertainty of our evaluation metrics.

In [None]:
def handle_scores(scores1_all, scores_ref_all, fhh):
    
    scores1 = scores1_all.sel(fcst_hour=(scores1_all.fcst_hour.dt.hour.isin(fhh)))
    scores_ref = scores_ref_all.sel(fcst_hour=(scores_ref_all.fcst_hour.dt.hour.isin(fhh)))
    score1_mean, score_ref_mean = scores1.mean(), scores_ref.mean()
    # two runs per day -> 2*7 correpsonds to a block length of one week
    score1_boot = perform_block_bootstrap_metric(scores1, "fcst_hour", 2*7)
    score_ref_boot = perform_block_bootstrap_metric(scores_ref, "fcst_hour", 2*7)
    
    return score1_mean, score_ref_mean, score1_boot, score_ref_boot
    

for fh in fcst_hours[5::]:
    print("Handling scores for forecast hour '{0:0d}'".format(fh))
    fh_curr = (init_times + fh)%24
    # MSE
    mse_era5_fcst[fh-1], mse_per_fcst[fh-1], mse_era5_fcst_boot[fh-1, :], mse_per_fcst_boot[fh-1, :] = handle_scores(score_data["mse"]["era5_all"],
                                                                                                                     score_data["mse"]["per_all"], fh_curr)
    # SSIM
    ssim_era5_fcst[fh-1], ssim_per_fcst[fh-1], ssim_era5_fcst_boot[fh-1, :], ssim_per_fcst_boot[fh-1, :] = handle_scores(score_data["ssim"]["era5_all"],
                                                                                                                         score_data["ssim"]["per_all"], fh_curr)
    # ACC
    acc_era5_fcst[fh-1], acc_per_fcst[fh-1], acc_era5_fcst_boot[fh-1, :], acc_per_fcst_boot[fh-1, :] = handle_scores(score_data["acc"]["era5_all"],
                                                                                                                     score_data["acc"]["per_all"], fh_curr)  
    # TEXTURE
    txtr_era5_fcst[fh-1], txtr_per_fcst[fh-1], txtr_era5_fcst_boot[fh-1, :], txtr_per_fcst_boot[fh-1, :] = handle_scores(score_data["texture"]["era5_all"],
                                                                                                                         score_data["texture"]["per_all"], fh_curr)
    
    #mse_era5_curr = mse_era5_all.sel(fcst_hour=(mse_era5_all.fcst_hour.dt.hour.isin(fh_curr)))
    #mse_per_curr = mse_per_all.sel(fcst_hour=(mse_per_all.fcst_hour.dt.hour.isin(fh_curr)))
    #mse_era5_fcst[fh-1], mse_per_fcst[fh-1] = mse_era5_curr.mean(), mse_per_curr.mean()
    ## two runs per day -> 2*7 correpsonds to a block length of one week
    #mse_era5_fcst_boot[fh-1, :] = perform_block_bootstrap_metric(mse_era5_curr, "fcst_hour", 2*7)
    #mse_per_fcst_boot[fh-1, :] = perform_block_bootstrap_metric(mse_per_curr, "fcst_hour", 2*7)


In [None]:
print(mse_era5_fcst)
print(ssim_era5_fcst)
print(acc_era5_fcst)
print(txtr_era5_fcst)

Finally, we put the data arrays into a joint dataset and save the results into the netCDF-file.

In [None]:
# create Dataset and save to netCDF-file
ds_mse = xr.Dataset({"2t_era5_mse_avg": mse_era5_fcst, "2t_era5_mse_bootstrapped": mse_era5_fcst_boot, 
                     "2t_persistence_mse_avg": mse_per_fcst, "2t_persistence_mse_bootstrapped": mse_per_fcst_boot,
                    # SSIM
                    "2t_era5_ssim_avg": ssim_era5_fcst, "2t_era5_mse_bootstrapped": ssim_era5_fcst_boot, 
                    "2t_persistence_ssim_avg": ssim_per_fcst, "2t_persistence_mse_bootstrapped": ssim_per_fcst_boot,
                    # ACC
                    "2t_era5_acc_avg": acc_era5_fcst, "2t_era5_acc_bootstrapped": acc_era5_fcst_boot, 
                    "2t_persistence_acc_avg": acc_per_fcst, "2t_persistence_acc_bootstrapped": acc_per_fcst_boot,
                    # TEXTURE 
                    "2t_era5_texture_avg": txtr_era5_fcst, "2t_era5_texture_bootstrapped": txtr_era5_fcst_boot, 
                    "2t_persistence_texture_avg": txtr_per_fcst, "2t_persistence_texture_bootstrapped": txtr_per_fcst_boot,                     
                    })

outfile = os.path.join(indir, "evaluation_metrics.nc")

print("Save evaluation metrics to '{0}'".format(outfile))
ds_mse.to_netcdf(outfile)

## DONE!