From 2575bd853c83c85d17abb37a7f023b0f658ce906 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Wed, 1 Jun 2022 16:43:20 +0200 Subject: [PATCH] revert many changes as multiprocessing approach will not work on HPC system with really big data due to memory issues --- HPC_setup/requirements_HDFML_additionals.txt | 4 - HPC_setup/requirements_JUWELS_additionals.txt | 4 - mlair/run_modules/post_processing.py | 352 ++---------------- requirements.txt | 4 - 4 files changed, 41 insertions(+), 323 deletions(-) diff --git a/HPC_setup/requirements_HDFML_additionals.txt b/HPC_setup/requirements_HDFML_additionals.txt index e811df8b..ebfac3cd 100644 --- a/HPC_setup/requirements_HDFML_additionals.txt +++ b/HPC_setup/requirements_HDFML_additionals.txt @@ -2,11 +2,7 @@ astropy==4.1 bottleneck==1.3.2 cached-property==1.5.2 iniconfig==1.1.1 -multiprocess==0.70.11 ordered-set==4.0.2 -pathos==0.2.7 -pox==0.2.9 -ppft==1.6.6.3 pyshp==2.1.3 pytest-html==3.1.1 pytest-lazy-fixture==0.6.3 diff --git a/HPC_setup/requirements_JUWELS_additionals.txt b/HPC_setup/requirements_JUWELS_additionals.txt index e811df8b..ebfac3cd 100644 --- a/HPC_setup/requirements_JUWELS_additionals.txt +++ b/HPC_setup/requirements_JUWELS_additionals.txt @@ -2,11 +2,7 @@ astropy==4.1 bottleneck==1.3.2 cached-property==1.5.2 iniconfig==1.1.1 -multiprocess==0.70.11 ordered-set==4.0.2 -pathos==0.2.7 -pox==0.2.9 -ppft==1.6.6.3 pyshp==2.1.3 pytest-html==3.1.1 pytest-lazy-fixture==0.6.3 diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index a5e70ba4..00d82f3c 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -8,8 +8,6 @@ import logging import os import sys import traceback -import multiprocessing -import psutil from typing import Dict, Tuple, Union, List, Callable import numpy as np @@ -709,109 +707,48 @@ class PostProcessing(RunEnvironment): logging.info(f"start make_prediction for {subset_type}") time_dimension = self.data_store.get("time_dim") window_dim = self.data_store.get("window_dim") - frequency = self._get_frequency() - use_multiprocessing = self.data_store.get("use_multiprocessing") - max_process = self.data_store.get("max_number_multiprocessing") - n_process = min([psutil.cpu_count(logical=False), len(subset), max_process]) # use only physical cpus - if n_process > 1 and use_multiprocessing is True: # parallel solution - pool = multiprocessing.Pool(n_process) - logging.info(f"running {getattr(pool, '_processes')} processes in parallel") - - - output = [] - output_pre = [] - pos = 0 - for i, data in enumerate(subset): - output_pre.append(pool.apply_async(f_proc_load_data, args=(data, ))) - if (i + 1) % (2 * n_process) == 0 or (i + 1) == len(subset): - for p in output_pre: - input_data, target_data, data = p.get() - nn_pred = self.model.predict(input_data, batch_size=512) - output.append(pool.apply_async( - f_proc_make_prediction, - args=(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator, - self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim, - self.forecast_path, subset_type, self.window_lead_time, self.ols_model))) - for p in output: - p.get() - logging.info(f"...finished: {subset[pos]} ({int((pos + 1.) / len(output) * 100)}%)") - pos += 1 - output, output_pre = [], [] - assert len(output) == 0 - assert len(output_pre) == 0 - pool.close() - pool.join() - - - - # output_pre = [pool.apply_async(f_proc_load_data, args=(data, )) for data in subset] - # for p in output_pre: - # input_data, target_data, data = p.get() - # nn_pred = self.model.predict(input_data) - # output.append(pool.apply_async( - # f_proc_make_prediction, - # args=(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator, - # self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim, - # self.forecast_path, subset_type, self.window_lead_time, self.ols_model))) - # for i, p in enumerate(output): - # p.get() - # logging.info(f"...finished: {subset[i]} ({int((i + 1.) / len(output) * 100)}%)") - # pool.close() - # pool.join() - else: # serial solution - logging.info("use serial make prediction approach") - for i, data in enumerate(subset): - input_data, target_data = data.get_data(as_numpy=(True, False)) - nn_pred = self.model.predict(input_data, batch_size=512) - f_proc_make_prediction(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator, - self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim, - self.forecast_path, subset_type, self.window_lead_time, self.ols_model) - logging.info(f"...finished: {data} ({int((i + 1.) / len(subset) * 100)}%)") - - # - # - # for i, data in enumerate(subset): - # input_data = data.get_X() - # target_data = data.get_Y(as_numpy=False) - # observation_data = data.get_observation() - # - # # get scaling parameters - # transformation_func = data.apply_transformation - # - # for normalised in [True, False]: - # # create empty arrays - # nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays( - # target_data, count=4) - # - # # nn forecast - # nn_prediction = self._create_nn_forecast(input_data, nn_prediction, transformation_func, normalised) - # - # # persistence - # persistence_prediction = self._create_persistence_forecast(observation_data, persistence_prediction, - # transformation_func, normalised) - # - # # ols - # ols_prediction = self._create_ols_forecast(input_data, ols_prediction, transformation_func, normalised) - # - # # observation - # observation = self._create_observation(target_data, observation, transformation_func, normalised) - # - # # merge all predictions - # full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency()) - # prediction_dict = {self.forecast_indicator: nn_prediction, - # "persi": persistence_prediction, - # self.observation_indicator: observation, - # "ols": ols_prediction} - # all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]), - # time_dimension, ahead_dim=self.ahead_dim, - # index_dim=self.index_dim, type_dim=self.model_type_dim, - # **prediction_dict) - # - # # save all forecasts locally - # prefix = "forecasts_norm" if normalised is True else "forecasts" - # file = os.path.join(self.forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc") - # all_predictions.to_netcdf(file) + for i, data in enumerate(subset): + input_data = data.get_X() + target_data = data.get_Y(as_numpy=False) + observation_data = data.get_observation() + + # get scaling parameters + transformation_func = data.apply_transformation + + for normalised in [True, False]: + # create empty arrays + nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays( + target_data, count=4) + + # nn forecast + nn_prediction = self._create_nn_forecast(input_data, nn_prediction, transformation_func, normalised) + + # persistence + persistence_prediction = self._create_persistence_forecast(observation_data, persistence_prediction, + transformation_func, normalised) + + # ols + ols_prediction = self._create_ols_forecast(input_data, ols_prediction, transformation_func, normalised) + + # observation + observation = self._create_observation(target_data, observation, transformation_func, normalised) + + # merge all predictions + full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency()) + prediction_dict = {self.forecast_indicator: nn_prediction, + "persi": persistence_prediction, + self.observation_indicator: observation, + "ols": ols_prediction} + all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]), + time_dimension, ahead_dim=self.ahead_dim, + index_dim=self.index_dim, type_dim=self.model_type_dim, + **prediction_dict) + + # save all forecasts locally + prefix = "forecasts_norm" if normalised is True else "forecasts" + file = os.path.join(self.forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc") + all_predictions.to_netcdf(file) def _get_frequency(self) -> str: """Get frequency abbreviation.""" @@ -1173,210 +1110,3 @@ class PostProcessing(RunEnvironment): file_name = f"error_report_{metric}_{model_type}.%s".replace(' ', '_').replace('/', '_') tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df) tables.save_to_md(report_path, file_name % "md", df=df) - - -class MakePrediction: - - def __init__(self, nn_pred, window_lead_time, ols_model): - self.nn_pred = nn_pred - self.window_lead_time = window_lead_time - self.ols_model = ols_model # must be copied maybe - - @staticmethod - def _create_empty_prediction_arrays(target_data, count=1): - """ - Create array to collect all predictions. Expand target data by a station dimension. """ - return [target_data.copy() for _ in range(count)] - - def _create_nn_forecast(self, input_data: xr.DataArray, nn_prediction: xr.DataArray, transformation_func: Callable, - normalised: bool, use_multiprocessing: bool = False) -> xr.DataArray: - """ - Create NN forecast for given input data. - - Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the - output of the main branch is returned (not all minor branches, if the network has multiple output branches). The - main branch is defined to be the last entry of all outputs. - - :param input_data: transposed history from DataPrep - :param nn_prediction: empty array in right shape to fill with data - :param transformation_func: a callable function to apply inverse transformation - :param normalised: transform prediction in original space if false, or use normalised predictions if true - - :return: filled data array with nn predictions - """ - tmp_nn = self.nn_pred - if isinstance(tmp_nn, list): - nn_prediction.values = tmp_nn[-1] - elif tmp_nn.ndim == 3: - nn_prediction.values = tmp_nn[-1, ...] - elif tmp_nn.ndim == 2: - nn_prediction.values = tmp_nn - else: - raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {tmp_nn.dims}.") - if not normalised: - nn_prediction = transformation_func(nn_prediction, base="target", inverse=True) - return nn_prediction - - def _create_persistence_forecast(self, data, persistence_prediction: xr.DataArray, transformation_func: Callable, - normalised: bool) -> xr.DataArray: - """ - Create persistence forecast with given data. - - Persistence is deviated from the value at t=0 and applied to all following time steps (t+1, ..., t+window). - Inverse transformation is applied to the forecast to get the output in the original space. - - :param data: observation - :param persistence_prediction: empty array in right shape to fill with data - :param transformation_func: a callable function to apply inverse transformation - :param normalised: transform prediction in original space if false, or use normalised predictions if true - - :return: filled data array with persistence predictions - """ - tmp_persi = data.copy() - persistence_prediction.values = np.tile(tmp_persi, (self.window_lead_time, 1)).T - if not normalised: - persistence_prediction = transformation_func(persistence_prediction, "target", inverse=True) - return persistence_prediction - - def _create_ols_forecast(self, input_data: xr.DataArray, ols_prediction: xr.DataArray, - transformation_func: Callable, normalised: bool) -> xr.DataArray: - """ - Create ordinary least square model forecast with given input data. - - Inverse transformation is applied to the forecast to get the output in the original space. - - :param input_data: transposed history from DataPrep - :param ols_prediction: empty array in right shape to fill with data - :param transformation_func: a callable function to apply inverse transformation - :param normalised: transform prediction in original space if false, or use normalised predictions if true - - :return: filled data array with ols predictions - """ - tmp_ols = self.ols_model.predict(input_data) - target_shape = ols_prediction.values.shape - if target_shape != tmp_ols.shape: - if len(target_shape) == 2: - new_values = np.swapaxes(tmp_ols, 1, 0) - else: - new_values = np.swapaxes(tmp_ols, 2, 0) - else: - new_values = tmp_ols - ols_prediction.values = new_values - if not normalised: - ols_prediction = transformation_func(ols_prediction, "target", inverse=True) - return ols_prediction - - def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray: - """ - Create observation as ground truth from given data. - - Inverse transformation is applied to the ground truth to get the output in the original space. - - :param data: observation - :param transformation_func: a callable function to apply inverse transformation - :param normalised: transform ground truth in original space if false, or use normalised predictions if true - - :return: filled data array with observation - """ - if not normalised: - data = transformation_func(data, "target", inverse=True) - return data - - @staticmethod - def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame: - """ - Create full index from first and last date inside df and resample with given frequency. - - :param df: use time range of this data set - :param freq: frequency of full index - - :return: empty data frame with full index. - """ - if isinstance(df, pd.DataFrame): - earliest = df.index[0] - latest = df.index[-1] - elif isinstance(df, xr.DataArray): - earliest = df.index[0].values - latest = df.index[-1].values - elif isinstance(df, pd.DatetimeIndex): - earliest = df[0] - latest = df[-1] - else: - raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes " - f"are supported. Given type is {type(df)}.") - index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq)) - return index - - @staticmethod - def create_forecast_arrays(index: pd.DataFrame, ahead_names: List[Union[str, int]], time_dimension, - ahead_dim="ahead", index_dim="index", type_dim="type", **kwargs): - """ - Combine different forecast types into single xarray. - - :param index: index for forecasts (e.g. time) - :param ahead_names: names of ahead values (e.g. hours or days) - :param kwargs: as xarrays; data of forecasts - - :return: xarray of dimension 3: index, ahead_names, # predictions - - """ - keys = list(kwargs.keys()) - res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan), - coords=[index.index, ahead_names, keys], dims=[index_dim, ahead_dim, type_dim]) - for k, v in kwargs.items(): - intersection = set(res.index.values) & set(v.indexes[time_dimension].values) - match_index = np.array(list(intersection)) - res.loc[match_index, :, k] = v.loc[match_index] - return res - - -def f_proc_load_data(data): - input_data, target_data = data.get_data(as_numpy=(True, False)) - return input_data, target_data, data - - -def f_proc_make_prediction(data, input_data, target_data, nn_pred, frequency, time_dimension, forecast_indicator, observation_indicator, window_dim, - ahead_dim, index_dim, model_type_dim, forecast_path, subset_type, window_lead_time, - ols_model): - - prediction_maker = MakePrediction(nn_pred, window_lead_time, ols_model) - - observation_data = data.get_observation() - - # get scaling parameters - transformation_func = data.apply_transformation - - for normalised in [True, False]: - - # create empty arrays - nn_pred, persi_pred, ols_pred, observation = prediction_maker._create_empty_prediction_arrays(target_data, - count=4) - - # nn forecast - nn_pred = prediction_maker._create_nn_forecast(input_data, nn_pred, transformation_func, normalised) - - # persistence - persi_pred = prediction_maker._create_persistence_forecast(observation_data, persi_pred, - transformation_func, normalised) - - # ols - ols_pred = prediction_maker._create_ols_forecast(input_data, ols_pred, transformation_func, normalised) - - # observation - observation = prediction_maker._create_observation(target_data, observation, transformation_func, normalised) - - # merge all predictions - full_index = prediction_maker.create_fullindex(observation_data.indexes[time_dimension], frequency) - prediction_dict = {forecast_indicator: nn_pred, - "persi": persi_pred, - observation_indicator: observation, - "ols": ols_pred} - all_predictions = prediction_maker.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]), - time_dimension, ahead_dim=ahead_dim, - index_dim=index_dim, type_dim=model_type_dim, - **prediction_dict) - - # save all forecasts locally - prefix = "forecasts_norm" if normalised is True else "forecasts" - file = os.path.join(forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc") - all_predictions.to_netcdf(file) diff --git a/requirements.txt b/requirements.txt index 1ba41d28..3afc17b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,14 +9,10 @@ keras_nightly==2.5.0.dev2021032900 locket==0.2.1 matplotlib==3.3.4 mock==4.0.3 -multiprocess==0.70.11 netcdf4==1.5.8 numpy==1.19.5 pandas==1.1.5 partd==1.2.0 -pathos==0.2.7 -pox==0.2.9 -ppft==1.6.6.3 psutil==5.8.0 pydot==1.4.2 pytest==6.2.2 -- GitLab