Skip to content
Snippets Groups Projects
Select Git revision
  • 809d323d36036cde51a6876f34e0d8f668d019bc
  • master default protected
  • noroot
  • differentauth
  • encrypted-secrets
  • secrets-backend
  • only-docker-restart protected
  • singlevolume
  • mptest
  • stable-0.34 protected
  • stable-0.33 protected
  • 0.33
  • stable-0.32 protected
  • stable-0.31 protected
  • stable-0.30 protected
  • stable-0.29 protected
  • stable-0.28 protected
  • stable-0.27 protected
  • stable-0.26 protected
  • stable-0.25 protected
  • stable-0.24 protected
  • stable-0.23 protected
  • stable-0.22 protected
  • stable-0.21 protected
  • stable-0.20 protected
  • stable-0.19-test-04 protected
  • stable-0.19-test-03 protected
  • stable-0.19-test-02 protected
  • stable-0.19-test-01 protected
29 results

main.py

Blame
  • post_processing.py 18.30 KiB
    __author__ = "Lukas Leufen, Felix Kleinert"
    __date__ = '2019-12-11'
    
    
    import logging
    import os
    
    import numpy as np
    import pandas as pd
    import xarray as xr
    import statsmodels.api as sm
    import keras
    from scipy import stats
    
    from src.run_modules.run_environment import RunEnvironment
    from src.data_handling.data_distributor import Distributor
    from src.data_handling.data_generator import DataGenerator
    from src.model_modules.linear_model import OrdinaryLeastSquaredModel
    from src import statistics
    from src import helpers
    from src.helpers import TimeTracking
    from src.plotting.postprocessing_plotting import plot_monthly_summary, plot_climsum_boxplot, plot_station_map, plot_conditional_quantiles
    from src.datastore import NameNotFoundInDataStore
    
    
    class PostProcessing(RunEnvironment):
    
        def __init__(self):
            super().__init__()
            self.model: keras.Model = self._load_model()
            self.ols_model = None
            self.batch_size: int = self.data_store.get("batch_size", "general.model")
            self.test_data: DataGenerator = self.data_store.get("generator", "general.test")
            self.test_data_distributed = Distributor(self.test_data, self.model, self.batch_size)
            self.train_data: DataGenerator = self.data_store.get("generator", "general.train")
            self.train_val_data: DataGenerator = self.data_store.get("generator", "general.train_val")
            self.plot_path: str = self.data_store.get("plot_path", "general")
            self.skill_scores = None
            # self._run()
    
        def _run(self):
            self.train_ols_model()
            preds_for_all_stations = self.make_prediction()
            self.skill_scores = self.calculate_skill_scores()  #TODO: stopped here, continue with plot routine. Think about if skill score should be saved locally and loaded for plotting or if self.skill_scores should be used
            self.plot()
    
        def _load_model(self):
            try:
                model = self.data_store.get("best_model", "general")
            except NameNotFoundInDataStore:
                logging.info("no model saved in data store. trying to load model from experiment")
                path = self.data_store.get("experiment_path", "general")
                name = f"{self.data_store.get('experiment_name', 'general')}_my_model.h5"
                model_name = os.path.join(path, name)
                model = keras.models.load_model(model_name)
            return model
    
        def plot(self):
            logging.debug("Run plotting routines...")
            path = self.data_store.get("forecast_path", "general")
            window_lead_time = self.data_store.get("window_lead_time", "general")
            target_var = self.data_store.get("target_var", "general")
    
            plot_conditional_quantiles(self.test_data.stations, plot_folder=self.plot_path, pred_name="CNN", ref_name="orig",
                                       forecast_path=self.data_store.get("forecast_path", "general"), plot_name_affix="cali-ref")
            plot_conditional_quantiles(self.test_data.stations, plot_folder=self.plot_path, pred_name="orig", ref_name="CNN",
                                       forecast_path=self.data_store.get("forecast_path", "general"), plot_name_affix="like-bas")
            plot_station_map(generators={'b': self.test_data}, plot_folder=self.plot_path)
            plot_monthly_summary(self.test_data.stations, path, r"forecasts_%s_test.nc", window_lead_time, target_var,
                                 plot_folder=self.plot_path)
            # plot_climsum_boxplot()
    
            # FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=True, **kwargs)
            # FKf.ss_climsum_boxplot(data=ss_clim_dic, modelsetup=modelsetup, score_only=False, extra_nametag='_all_terms', **kwargs)
            # FKf.ss_sum_boxplot(ss_dic, plot_path, modelsetup)
    
        def calculate_test_score(self):
            test_score = self.model.evaluate_generator(generator=self.test_data_distributed.distribute_on_batches(),
                                                       use_multiprocessing=False, verbose=0, steps=1)
            logging.info(f"test score = {test_score}")
            self._save_test_score(test_score)
    
        def _save_test_score(self, score):
            path = self.data_store.get("experiment_path", "general")
            with open(os.path.join(path, "test_scores.txt")) as f:
                for index, item in enumerate(score):
                    f.write(f"{self.model.metrics[index]}, {item}\n")
    
        def train_ols_model(self):
            self.ols_model = OrdinaryLeastSquaredModel(self.train_data)
    
        def make_prediction(self, freq="1D"):
            logging.debug("start make_prediction")
            nn_prediction_all_stations = []
            for i, v in enumerate(self.test_data):
                data = self.test_data.get_data_generator(i)
    
                nn_prediction, persistence_prediction, ols_prediction = self._create_empty_prediction_arrays(data, count=3)
                input_data = self.test_data[i][0]
    
                # get scaling parameters
                mean, std, transformation_method = data.get_transformation_information(variable='o3')
    
                # nn forecast
                nn_prediction = self._create_nn_forecast(input_data, nn_prediction, mean, std, transformation_method)
    
                # persistence
                persistence_prediction = self._create_persistence_forecast(input_data, persistence_prediction, mean, std, 
                                                                           transformation_method)
    
                # ols
                ols_prediction = self._create_ols_forecast(input_data, ols_prediction, mean, std, transformation_method)
    
                # orig pred
                orig_pred = self._create_orig_forecast(data, None, mean, std, transformation_method)
    
                # merge all predictions
                full_index = self.create_fullindex(data.data.indexes['datetime'], freq)
                all_predictions = self.create_forecast_arrays(full_index, list(data.label.indexes['window']),
                                                              CNN=nn_prediction,
                                                              persi=persistence_prediction,
                                                              orig=orig_pred,
                                                              OLS=ols_prediction)
    
                # save all forecasts locally
                path = self.data_store.get("forecast_path", "general")
                file = os.path.join(path, f"forecasts_{data.station[0]}_test.nc")
                all_predictions.to_netcdf(file)
    
                # save nn forecast to return variable
                nn_prediction_all_stations.append(nn_prediction)
            return nn_prediction_all_stations
    
        @staticmethod
        def _create_orig_forecast(data, _, mean, std, transformation_method):
            return statistics.apply_inverse_transformation(data.label, mean, std, transformation_method)
    
        def _create_ols_forecast(self, input_data, ols_prediction, mean, std, transformation_method):
            tmp_ols = self.ols_model.predict(input_data)
            tmp_ols = statistics.apply_inverse_transformation(tmp_ols, mean, std, transformation_method)
            ols_prediction.values = np.swapaxes(np.expand_dims(tmp_ols, axis=1), 2, 0)
            return ols_prediction
    
        def _create_persistence_forecast(self, input_data, persistence_prediction, mean, std, transformation_method):
            tmp_persi = input_data.sel({'window': 0, 'variables': 'o3'})
            tmp_persi = statistics.apply_inverse_transformation(tmp_persi, mean, std, transformation_method)
            window_lead_time = self.data_store.get("window_lead_time", "general")
            persistence_prediction.values = np.expand_dims(np.tile(tmp_persi.squeeze('Stations'), (window_lead_time, 1)),
                                                           axis=1)
            return persistence_prediction
    
        def _create_nn_forecast(self, input_data, nn_prediction, mean, std, transformation_method):
            tmp_nn = self.model.predict(input_data)
            tmp_nn = statistics.apply_inverse_transformation(tmp_nn, mean, std, transformation_method)
            nn_prediction.values = np.swapaxes(np.expand_dims(tmp_nn, axis=1), 2, 0)
            return nn_prediction
    
        @staticmethod
        def _create_empty_prediction_arrays(generator, count=1):
            return [generator.label.copy()] * count
    
        @staticmethod
        def create_fullindex(df, freq):
            # Diese Funkton erstellt ein leeres df, mit Index der Frequenz frequ zwischen dem ersten und dem letzten Datum in df
            # param: df as pandas dataframe
            # param: freq as string
            # return: index as pandas dataframe
            if isinstance(df, pd.DataFrame):
                earliest = df.index[0]
                latest = df.index[-1]
            elif isinstance(df, xr.DataArray):
                earliest = df.index[0].values
                latest = df.index[-1].values
            elif isinstance(df, pd.core.indexes.datetimes.DatetimeIndex):
                earliest = df[0]
                latest = df[-1]
            else:
                raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes "
                                     f"are supported. Given type is {type(df)}.")
            index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
            return index
    
        @staticmethod
        def create_forecast_arrays(index, ahead_names, **kwargs):
            """
            This function combines different forecast types into one xarray.
    
            :param index: as index; index for forecasts (e.g. time)
            :param ahead_names: as list of str/int: names of ahead values (e.g. hours or days)
            :param kwargs: as xarrays; data of forecasts
            :return: xarray of dimension 3: index, ahead_names, # predictions
    
            """
            keys = list(kwargs.keys())
            res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
                               coords=[index.index, ahead_names, keys], dims=['index', 'ahead', 'type'])
            for k, v in kwargs.items():
                try:
                    match_index = np.stack(set(res.index.values) & set(v.index.values))
                    res.loc[match_index, :, k] = v.loc[match_index]
                except AttributeError:  # v is xarray type and has no attribute .index
                    match_index = np.stack(set(res.index.values) & set(v.indexes['datetime'].values))
                    res.loc[match_index, :, k] = v.sel({'datetime': match_index}).squeeze('Stations').transpose()
            return res
    
        def _get_external_data(self, station):
            try:
                data = self.train_val_data.get_data_generator(station)
                mean, std, transformation_method = data.get_transformation_information(variable='o3')
                external_data = self._create_orig_forecast(data, None, mean, std, transformation_method)
                external_data = external_data.squeeze("Stations").sel(window=1).drop(["window", "Stations", "variables"])
                return external_data.rename({'datetime': 'index'})
            except KeyError:
                return None
    
        def calculate_skill_scores(self, threshold=60):
            path = self.data_store.get("forecast_path", "general")
            window_lead_time = self.data_store.get("window_lead_time", "general")
            skill_score_general = {}
            skill_score_climatological = {}
            for station in self.test_data.stations:  # TODO: replace this by a more general approach to also calculate on train/val
                file = os.path.join(path, f"forecasts_{station}_test.nc")
                data = xr.open_dataarray(file)
                ss = SkillScores(data)
                external_data = self._get_external_data(station)
                skill_score_general[station] = ss.skill_scores(window_lead_time)
                skill_score_climatological[station] = ss.climatological_skill_scores(external_data, window_lead_time)
    
            return skill_score_general, skill_score_climatological
    
    
    class SkillScores(RunEnvironment):
    
        def __init__(self, internal_data):
            super().__init__()
            self.internal_data = internal_data
    
        def skill_scores(self, window_lead_time):
            ahead_names = list(range(1, window_lead_time + 1))
            skill_score = pd.DataFrame(index=['cnn-persi', 'ols-persi', 'cnn-ols'])
            for iahead in ahead_names:
                data = self.internal_data.sel(ahead=iahead)
                skill_score[iahead] = [self.general_skill_score(data),
                                       self.general_skill_score(data, forecast_name="OLS"),
                                       self.general_skill_score(data, reference_name="OLS")]
            return skill_score
    
        def climatological_skill_scores(self, external_data, window_lead_time):
            ahead_names = list(range(1, window_lead_time + 1))
    
            all_terms = ['AI', 'AII', 'AIII', 'AIV', 'BI', 'BII', 'BIV', 'CI', 'CIV', 'CASE I', 'CASE II', 'CASE III',
                         'CASE IV']
            skill_score = xr.DataArray(np.full((len(all_terms), len(ahead_names)), np.nan), coords=[all_terms],
                                            dims=['terms', 'ahead'])
    
            for iahead in ahead_names:
    
                data = self.internal_data.sel(ahead=iahead)
    
                skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score(
                    data, mu_type=1, forecast_name="CNN").values.flatten())
    
                skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score(
                    data, mu_type=2, forecast_name="CNN").values.flatten())
    
                if external_data is not None:
                    skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score(
                        data, mu_type=3, forecast_name="CNN",
                        external_data=external_data).values.flatten())
    
                    skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score(
                        data, mu_type=4, forecast_name="CNN",
                        external_data=external_data).values.flatten())
    
            return skill_score
    
        def _climatological_skill_score(self, data, mu_type=1, observation_name="orig", forecast_name="CNN", external_data=None):
            kwargs = {"external_data": external_data} if external_data is not None else {}
            return self.__getattribute__(f"skill_score_mu_case_{mu_type}")(data, observation_name, forecast_name, **kwargs)
    
        @staticmethod
        def general_skill_score(data, observation_name="orig", forecast_name="CNN", reference_name="persi"):
            data = data.dropna("index")
            observation = data.loc[..., observation_name]
            forecast = data.loc[..., forecast_name]
            reference = data.loc[..., reference_name]
            mse = statistics.mean_squared_error
            skill_score = 1 - mse(observation, forecast) / mse(observation, reference)
            return skill_score
    
        @staticmethod
        def skill_score_pre_calculations(data, observation_name, forecast_name):
    
            data = data.loc[..., [observation_name, forecast_name]].drop("ahead")
            data = data.dropna("index")
    
            mean = data.mean("index")
            var = data.var("index")
            r, p = stats.spearmanr(data.loc[..., [forecast_name, observation_name]])
    
            AI = np.array(r ** 2)
            BI = ((r - var.loc[..., forecast_name] / var.loc[..., observation_name]) ** 2).values
            CI = (((mean.loc[..., forecast_name] - mean.loc[..., observation_name]) / var.loc[
                ..., observation_name]) ** 2).values
    
            suffix = {"mean": mean, "var": var, "r": r, "p": p}
            return AI, BI, CI, data, suffix
    
        def skill_score_mu_case_1(self, data, observation_name="orig", forecast_name="CNN"):
            AI, BI, CI, data, _ = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            skill_score = np.array(AI - BI - CI)
            return pd.DataFrame({"skill_score": [skill_score], "AI": [AI], "BI": [BI], "CI": [CI]}).to_xarray().to_array()
    
        def skill_score_mu_case_2(self, data, observation_name="orig", forecast_name="CNN"):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            monthly_mean = self.create_monthly_mean_from_daily_data(data)
            data = xr.concat([data, monthly_mean], dim="type")
            var = data.var("index")
            r, p = stats.spearmanr(data.loc[..., [observation_name, observation_name + "X"]])
            AII = np.array(r ** 2)
            BII = ((r - var.loc[observation_name + 'X'] / var.loc[observation_name]) ** 2).values
            skill_score = np.array((AI - BI - CI - AII + BII) / (1 - AII + BII))
            return pd.DataFrame({"skill_score": [skill_score], "AII": [AII], "BII": [BII]}).to_xarray().to_array()
    
        def skill_score_mu_case_3(self, data, observation_name="orig", forecast_name="CNN", external_data=None):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            mean, var = suffix["mean"], suffix["var"]
            AIII = (((external_data.mean().values - mean.loc[observation_name]) / var.loc[observation_name])**2).values
            skill_score = np.array((AI - BI - CI + AIII) / 1 + AIII)
            return pd.DataFrame({"skill_score": [skill_score], "AIII": [AIII]}).to_xarray().to_array()
    
        def skill_score_mu_case_4(self, data, observation_name="orig", forecast_name="CNN", external_data=None):
            AI, BI, CI, data, suffix = self.skill_score_pre_calculations(data, observation_name, forecast_name)
            monthly_mean_external = self.create_monthly_mean_from_daily_data(external_data, columns=data.type.values)
            data = xr.concat([data, monthly_mean_external], dim="type")
            mean = data.mean("index")
            var = data.var("index")
    
            r_mu, p_mu = stats.spearmanr(data.loc[..., [observation_name, observation_name+'X']])
    
            AIV = np.array(r_mu**2)
            BIV = ((r_mu - var.loc[observation_name + 'X'] / var.loc[observation_name])**2).values
            CIV = (((mean.loc[observation_name + 'X'] - mean.loc[observation_name]) / var.loc[observation_name])**2).values
            skill_score = np.array((AI - BI - CI - AIV + BIV + CIV) / (1 - AIV + BIV + CIV))
            return pd.DataFrame({"skill_score": [skill_score], "AIV": [AIV], "BIV": [BIV], "CIV": CIV}).to_xarray().to_array()
    
        @staticmethod
        def create_monthly_mean_from_daily_data(data, columns=None):
            if columns is None:
                columns = data.type.values
            coordinates = [data.index, [v + "X" for v in list(columns)]]  # TODO
            empty_data = np.full((len(data.index), len(columns)), np.nan)
            monthly_mean = xr.DataArray(empty_data, coords=coordinates, dims=["index", "type"])
            mu = data.groupby("index.month").mean()
    
            for month in mu.month:
                monthly_mean[monthly_mean.index.dt.month == month, :] = mu[mu.month == month].values
    
            return monthly_mean