Skip to content
Snippets Groups Projects
Select Git revision
  • d68daa628db70cf0ff0cbe4e786832fc1c93e5a2
  • master default protected
  • enxhi_issue460_remove_TOAR-I_access
  • michael_issue459_preprocess_german_stations
  • sh_pollutants
  • develop protected
  • release_v2.4.0
  • michael_issue450_feat_load-ifs-data
  • lukas_issue457_feat_set-config-paths-as-parameter
  • lukas_issue454_feat_use-toar-statistics-api-v2
  • lukas_issue453_refac_advanced-retry-strategy
  • lukas_issue452_bug_update-proj-version
  • lukas_issue449_refac_load-era5-data-from-toar-db
  • lukas_issue451_feat_robust-apriori-estimate-for-short-timeseries
  • lukas_issue448_feat_load-model-from-path
  • lukas_issue447_feat_store-and-load-local-clim-apriori-data
  • lukas_issue445_feat_data-insight-plot-monthly-distribution
  • lukas_issue442_feat_bias-free-evaluation
  • lukas_issue444_feat_choose-interp-method-cams
  • 414-include-crps-analysis-and-other-ens-verif-methods-or-plots
  • lukas_issue384_feat_aqw-data-handler
  • v2.4.0 protected
  • v2.3.0 protected
  • v2.2.0 protected
  • v2.1.0 protected
  • Kleinert_etal_2022_initial_submission
  • v2.0.0 protected
  • v1.5.0 protected
  • v1.4.0 protected
  • v1.3.0 protected
  • v1.2.1 protected
  • v1.2.0 protected
  • v1.1.0 protected
  • IntelliO3-ts-v1.0_R1-submit
  • v1.0.0 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • IntelliO3-ts-v1.0_initial-submit
41 results

mlt_modules_hdfml.sh

Blame
  • statistics.py 10.05 KiB
    from scipy import stats
    
    from src.run_modules.run_environment import RunEnvironment
    
    __author__ = 'Lukas Leufen, Felix Kleinert'
    __date__ = '2019-10-23'
    
    import numpy as np
    import xarray as xr
    import pandas as pd
    from typing import Union, Tuple
    
    
    Data = Union[xr.DataArray, pd.DataFrame]
    
    
    def apply_inverse_transformation(data, mean, std=None, method="standardise"):
        if method == 'standardise':
            return standardise_inverse(data, mean, std)
        elif method == 'centre':
            return centre_inverse(data, mean)
        elif method == 'normalise':
            # use min/max of data or given min/max
            raise NotImplementedError
        else:
            raise NotImplementedError
    
    
    def standardise(data: Data, dim: Union[str, int]) -> Tuple[Data, Data, Data]:
        """
        This function standardises a xarray.dataarray (along dim) or pandas.DataFrame (along axis) with mean=0 and std=1
        :param data:
        :param string/int dim:
                | for xarray.DataArray as string: name of dimension which should be standardised
                | for pandas.DataFrame as int: axis of dimension which should be standardised
        :return: xarray.DataArrays or pandas.DataFrames:
                #. mean: Mean of data
                #. std: Standard deviation of data
                #. data: Standardised data
        """
        return data.mean(dim), data.std(dim), (data - data.mean(dim)) / data.std(dim)
    
    
    def standardise_inverse(data: Data, mean: Data, std: Data) -> Data:
        """
        This is the inverse function of `standardise` and therefore vanishes the standardising.
        :param data:
        :param mean:
        :param std:
        :return:
        """
        return data * std + mean
    
    
    def centre(data: Data, dim: Union[str, int]) -> Tuple[Data, None, Data]:
        """
        This function centres a xarray.dataarray (along dim) or pandas.DataFrame (along axis) to mean=0
        :param data:
        :param string/int dim:
                | for xarray.DataArray as string: name of dimension which should be standardised
                | for pandas.DataFrame as int: axis of dimension which should be standardised
        :return: xarray.DataArrays or pandas.DataFrames:
                #. mean: Mean of data
                #. std: Standard deviation of data
                #. data: Standardised data
        """
        return data.mean(dim), None, data - data.mean(dim)
    
    
    def centre_inverse(data: Data, mean: Data) -> Data:
        """
        This function is the inverse function of `centre` and therefore adds the given values of mean to the data.
        :param data:
        :param mean:
        :return:
        """
        return data + mean
    
    
    def mean_squared_error(a, b):
        return np.square(a - b).mean()
    
    
    class SkillScores(RunEnvironment):
    
        def __init__(self, internal_data):
            super().__init__()
            self.internal_data = internal_data
    
        def skill_scores(self, window_lead_time):
            ahead_names = list(range(1, window_lead_time + 1))
            skill_score = pd.DataFrame(index=['cnn-persi', 'ols-persi', 'cnn-ols'])
            for iahead in ahead_names:
                data = self.internal_data.sel(ahead=iahead)
                skill_score[iahead] = [self.general_skill_score(data, forecast_name="CNN", reference_name="persi"),
                                       self.general_skill_score(data, forecast_name="OLS", reference_name="persi"),
                                       self.general_skill_score(data, forecast_name="CNN", reference_name="OLS")]
            return skill_score
    
        def climatological_skill_scores(self, external_data, window_lead_time):
            ahead_names = list(range(1, window_lead_time + 1))
    
            all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III',
                         'CASE IV']
            skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms, ahead_names],
                                       dims=['terms', 'ahead'])
    
            for iahead in ahead_names:
    
                data = self.internal_data.sel(ahead=iahead)
    
                skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score(
                    data, mu_type=1, forecast_name="CNN").values.flatten())
    
                skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score(
                    data, mu_type=2, forecast_name="CNN").values.flatten())
    
                if external_data is not None:
                    skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score(
                        data, mu_type=3, forecast_name="CNN",
                        external_data=external_data).values.flatten())
    
                    skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score(
                        data, mu_type=4, forecast_name="CNN",
                        external_data=external_data).values.flatten())
    
            return skill_score
    
        def _climatological_skill_score(self, data, mu_type=1, observation_name="obs", forecast_name="CNN", external_data=None):
            kwargs = {"external_data": external_data} if external_data is not None else {}
            return self.__getattribute__(f"skill_score_mu_case_{mu_type}")(data, observation_name, forecast_name, **kwargs)
    
        @staticmethod
        def general_skill_score(data, observation_name="obs", forecast_name="CNN", reference_name="persi"):
            data = data.dropna("index")
            observation = data.sel(type=observation_name)
            forecast = data.sel(type=forecast_name)
            reference = data.sel(type=reference_name)
            mse = mean_squared_error
            skill_score = 1 - mse(observation, forecast) / mse(observation, reference)
            return skill_score.values
    
        @staticmethod
        def skill_score_pre_calculations(data, observation_name, forecast_name):
    
            data = data.loc[..., [observation_name, forecast_name]].drop("ahead")
            data = data.dropna("index")
    
            mean = data.mean("index")
            sigma = np.sqrt(data.var("index"))
            # r, p = stats.spearmanr(data.loc[..., [forecast_name, observation_name]])
            r, p = stats.pearsonr(data.loc[..., forecast_name], data.loc[..., observation_name])
    
            AI = np.array(r ** 2)
            BI = ((r - (sigma.loc[..., forecast_name] / sigma.loc[..., observation_name])) ** 2).values
            CI = (((mean.loc[..., forecast_name] - mean.loc[..., observation_name]) / sigma.loc[
                ..., observation_name]) ** 2).values
    
            suffix = {"mean": mean, "sigma": sigma, "r": r, "p": p}
            return AI, BI, CI, data, suffix
    
        def skill_score_mu_case_1(self, data, observation_name="obs", forecast_name="CNN"):
            AI, BI, CI, data, _ = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            skill_score = np.array(AI - BI - CI)
            return pd.DataFrame({"skill_score": [skill_score], "AI": [AI], "BI": [BI], "CI": [CI]}).to_xarray().to_array()
    
        def skill_score_mu_case_2(self, data, observation_name="obs", forecast_name="CNN"):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            monthly_mean = self.create_monthly_mean_from_daily_data(data)
            data = xr.concat([data, monthly_mean], dim="type")
            sigma = suffix["sigma"]
            sigma_monthly = np.sqrt(monthly_mean.var())
            # r, p = stats.spearmanr(data.loc[..., [observation_name, observation_name + "X"]])
            r, p = stats.pearsonr(data.loc[..., observation_name], data.loc[..., observation_name + "X"])
            AII = np.array(r ** 2)
            BII = ((r - sigma_monthly / sigma.loc[observation_name]) ** 2).values
            skill_score = np.array((AI - BI - CI - AII + BII) / (1 - AII + BII))
            return pd.DataFrame({"skill_score": [skill_score], "AII": [AII], "BII": [BII]}).to_xarray().to_array()
    
        def skill_score_mu_case_3(self, data, observation_name="obs", forecast_name="CNN", external_data=None):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            mean, sigma = suffix["mean"], suffix["sigma"]
            AIII = (((external_data.mean().values - mean.loc[observation_name]) / sigma.loc[observation_name])**2).values
            skill_score = np.array((AI - BI - CI + AIII) / 1 + AIII)
            return pd.DataFrame({"skill_score": [skill_score], "AIII": [AIII]}).to_xarray().to_array()
    
        def skill_score_mu_case_4(self, data, observation_name="obs", forecast_name="CNN", external_data=None):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values, index=data.index)
            data = xr.concat([data, monthly_mean_external], dim="type")
            mean, sigma = suffix["mean"], suffix["sigma"]
            monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values)
            mean_external = monthly_mean_external.mean()
            sigma_external = np.sqrt(monthly_mean_external.var())
    
            # r_mu, p_mu = stats.spearmanr(data.loc[..., [observation_name, observation_name+'X']])
            r_mu, p_mu = stats.pearsonr(data.loc[..., observation_name], data.loc[..., observation_name + "X"])
    
            AIV = np.array(r_mu**2)
            BIV = ((r_mu - sigma_external / sigma.loc[observation_name])**2).values
            CIV = (((mean_external - mean.loc[observation_name]) / sigma.loc[observation_name])**2).values
            skill_score = np.array((AI - BI - CI - AIV + BIV + CIV) / (1 - AIV + BIV + CIV))
            return pd.DataFrame({"skill_score": [skill_score], "AIV": [AIV], "BIV": [BIV], "CIV": CIV}).to_xarray().to_array()
    
        @staticmethod
        def create_monthly_mean_from_daily_data(data, columns=None, index=None):
            if columns is None:
                columns = data.type.values
            if index is None:
                index = data.index
            coordinates = [index, [v + "X" for v in list(columns)]]
            empty_data = np.full((len(index), len(columns)), np.nan)
            monthly_mean = xr.DataArray(empty_data, coords=coordinates, dims=["index", "type"])
            mu = data.groupby("index.month").mean()
    
            for month in mu.month:
                monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values
    
            return monthly_mean