Commit 2575bd85 authored by lukas leufen's avatar lukas leufen 👻
Browse files

revert many changes as multiprocessing approach will not work on HPC system...

revert many changes as multiprocessing approach will not work on HPC system with really big data due to memory issues
parent c40517da
Pipeline #101829 passed with stages
in 10 minutes and 40 seconds
......@@ -2,11 +2,7 @@ astropy==4.1
bottleneck==1.3.2
cached-property==1.5.2
iniconfig==1.1.1
multiprocess==0.70.11
ordered-set==4.0.2
pathos==0.2.7
pox==0.2.9
ppft==1.6.6.3
pyshp==2.1.3
pytest-html==3.1.1
pytest-lazy-fixture==0.6.3
......
......@@ -2,11 +2,7 @@ astropy==4.1
bottleneck==1.3.2
cached-property==1.5.2
iniconfig==1.1.1
multiprocess==0.70.11
ordered-set==4.0.2
pathos==0.2.7
pox==0.2.9
ppft==1.6.6.3
pyshp==2.1.3
pytest-html==3.1.1
pytest-lazy-fixture==0.6.3
......
......@@ -8,8 +8,6 @@ import logging
import os
import sys
import traceback
import multiprocessing
import psutil
from typing import Dict, Tuple, Union, List, Callable
import numpy as np
......@@ -709,109 +707,48 @@ class PostProcessing(RunEnvironment):
logging.info(f"start make_prediction for {subset_type}")
time_dimension = self.data_store.get("time_dim")
window_dim = self.data_store.get("window_dim")
frequency = self._get_frequency()
use_multiprocessing = self.data_store.get("use_multiprocessing")
max_process = self.data_store.get("max_number_multiprocessing")
n_process = min([psutil.cpu_count(logical=False), len(subset), max_process]) # use only physical cpus
if n_process > 1 and use_multiprocessing is True: # parallel solution
pool = multiprocessing.Pool(n_process)
logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
output = []
output_pre = []
pos = 0
for i, data in enumerate(subset):
output_pre.append(pool.apply_async(f_proc_load_data, args=(data, )))
if (i + 1) % (2 * n_process) == 0 or (i + 1) == len(subset):
for p in output_pre:
input_data, target_data, data = p.get()
nn_pred = self.model.predict(input_data, batch_size=512)
output.append(pool.apply_async(
f_proc_make_prediction,
args=(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator,
self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim,
self.forecast_path, subset_type, self.window_lead_time, self.ols_model)))
for p in output:
p.get()
logging.info(f"...finished: {subset[pos]} ({int((pos + 1.) / len(output) * 100)}%)")
pos += 1
output, output_pre = [], []
assert len(output) == 0
assert len(output_pre) == 0
pool.close()
pool.join()
# output_pre = [pool.apply_async(f_proc_load_data, args=(data, )) for data in subset]
# for p in output_pre:
# input_data, target_data, data = p.get()
# nn_pred = self.model.predict(input_data)
# output.append(pool.apply_async(
# f_proc_make_prediction,
# args=(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator,
# self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim,
# self.forecast_path, subset_type, self.window_lead_time, self.ols_model)))
# for i, p in enumerate(output):
# p.get()
# logging.info(f"...finished: {subset[i]} ({int((i + 1.) / len(output) * 100)}%)")
# pool.close()
# pool.join()
else: # serial solution
logging.info("use serial make prediction approach")
for i, data in enumerate(subset):
input_data, target_data = data.get_data(as_numpy=(True, False))
nn_pred = self.model.predict(input_data, batch_size=512)
f_proc_make_prediction(data, input_data, target_data, nn_pred, frequency, time_dimension, self.forecast_indicator,
self.observation_indicator, window_dim, self.ahead_dim, self.index_dim, self.model_type_dim,
self.forecast_path, subset_type, self.window_lead_time, self.ols_model)
logging.info(f"...finished: {data} ({int((i + 1.) / len(subset) * 100)}%)")
#
#
# for i, data in enumerate(subset):
# input_data = data.get_X()
# target_data = data.get_Y(as_numpy=False)
# observation_data = data.get_observation()
#
# # get scaling parameters
# transformation_func = data.apply_transformation
#
# for normalised in [True, False]:
# # create empty arrays
# nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays(
# target_data, count=4)
#
# # nn forecast
# nn_prediction = self._create_nn_forecast(input_data, nn_prediction, transformation_func, normalised)
#
# # persistence
# persistence_prediction = self._create_persistence_forecast(observation_data, persistence_prediction,
# transformation_func, normalised)
#
# # ols
# ols_prediction = self._create_ols_forecast(input_data, ols_prediction, transformation_func, normalised)
#
# # observation
# observation = self._create_observation(target_data, observation, transformation_func, normalised)
#
# # merge all predictions
# full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency())
# prediction_dict = {self.forecast_indicator: nn_prediction,
# "persi": persistence_prediction,
# self.observation_indicator: observation,
# "ols": ols_prediction}
# all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]),
# time_dimension, ahead_dim=self.ahead_dim,
# index_dim=self.index_dim, type_dim=self.model_type_dim,
# **prediction_dict)
#
# # save all forecasts locally
# prefix = "forecasts_norm" if normalised is True else "forecasts"
# file = os.path.join(self.forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc")
# all_predictions.to_netcdf(file)
for i, data in enumerate(subset):
input_data = data.get_X()
target_data = data.get_Y(as_numpy=False)
observation_data = data.get_observation()
# get scaling parameters
transformation_func = data.apply_transformation
for normalised in [True, False]:
# create empty arrays
nn_prediction, persistence_prediction, ols_prediction, observation = self._create_empty_prediction_arrays(
target_data, count=4)
# nn forecast
nn_prediction = self._create_nn_forecast(input_data, nn_prediction, transformation_func, normalised)
# persistence
persistence_prediction = self._create_persistence_forecast(observation_data, persistence_prediction,
transformation_func, normalised)
# ols
ols_prediction = self._create_ols_forecast(input_data, ols_prediction, transformation_func, normalised)
# observation
observation = self._create_observation(target_data, observation, transformation_func, normalised)
# merge all predictions
full_index = self.create_fullindex(observation_data.indexes[time_dimension], self._get_frequency())
prediction_dict = {self.forecast_indicator: nn_prediction,
"persi": persistence_prediction,
self.observation_indicator: observation,
"ols": ols_prediction}
all_predictions = self.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]),
time_dimension, ahead_dim=self.ahead_dim,
index_dim=self.index_dim, type_dim=self.model_type_dim,
**prediction_dict)
# save all forecasts locally
prefix = "forecasts_norm" if normalised is True else "forecasts"
file = os.path.join(self.forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc")
all_predictions.to_netcdf(file)
def _get_frequency(self) -> str:
"""Get frequency abbreviation."""
......@@ -1173,210 +1110,3 @@ class PostProcessing(RunEnvironment):
file_name = f"error_report_{metric}_{model_type}.%s".replace(' ', '_').replace('/', '_')
tables.save_to_tex(report_path, file_name % "tex", column_format=column_format, df=df)
tables.save_to_md(report_path, file_name % "md", df=df)
class MakePrediction:
def __init__(self, nn_pred, window_lead_time, ols_model):
self.nn_pred = nn_pred
self.window_lead_time = window_lead_time
self.ols_model = ols_model # must be copied maybe
@staticmethod
def _create_empty_prediction_arrays(target_data, count=1):
"""
Create array to collect all predictions. Expand target data by a station dimension. """
return [target_data.copy() for _ in range(count)]
def _create_nn_forecast(self, input_data: xr.DataArray, nn_prediction: xr.DataArray, transformation_func: Callable,
normalised: bool, use_multiprocessing: bool = False) -> xr.DataArray:
"""
Create NN forecast for given input data.
Inverse transformation is applied to the forecast to get the output in the original space. Furthermore, only the
output of the main branch is returned (not all minor branches, if the network has multiple output branches). The
main branch is defined to be the last entry of all outputs.
:param input_data: transposed history from DataPrep
:param nn_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with nn predictions
"""
tmp_nn = self.nn_pred
if isinstance(tmp_nn, list):
nn_prediction.values = tmp_nn[-1]
elif tmp_nn.ndim == 3:
nn_prediction.values = tmp_nn[-1, ...]
elif tmp_nn.ndim == 2:
nn_prediction.values = tmp_nn
else:
raise NotImplementedError(f"Number of dimension of model output must be 2 or 3, but not {tmp_nn.dims}.")
if not normalised:
nn_prediction = transformation_func(nn_prediction, base="target", inverse=True)
return nn_prediction
def _create_persistence_forecast(self, data, persistence_prediction: xr.DataArray, transformation_func: Callable,
normalised: bool) -> xr.DataArray:
"""
Create persistence forecast with given data.
Persistence is deviated from the value at t=0 and applied to all following time steps (t+1, ..., t+window).
Inverse transformation is applied to the forecast to get the output in the original space.
:param data: observation
:param persistence_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with persistence predictions
"""
tmp_persi = data.copy()
persistence_prediction.values = np.tile(tmp_persi, (self.window_lead_time, 1)).T
if not normalised:
persistence_prediction = transformation_func(persistence_prediction, "target", inverse=True)
return persistence_prediction
def _create_ols_forecast(self, input_data: xr.DataArray, ols_prediction: xr.DataArray,
transformation_func: Callable, normalised: bool) -> xr.DataArray:
"""
Create ordinary least square model forecast with given input data.
Inverse transformation is applied to the forecast to get the output in the original space.
:param input_data: transposed history from DataPrep
:param ols_prediction: empty array in right shape to fill with data
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform prediction in original space if false, or use normalised predictions if true
:return: filled data array with ols predictions
"""
tmp_ols = self.ols_model.predict(input_data)
target_shape = ols_prediction.values.shape
if target_shape != tmp_ols.shape:
if len(target_shape) == 2:
new_values = np.swapaxes(tmp_ols, 1, 0)
else:
new_values = np.swapaxes(tmp_ols, 2, 0)
else:
new_values = tmp_ols
ols_prediction.values = new_values
if not normalised:
ols_prediction = transformation_func(ols_prediction, "target", inverse=True)
return ols_prediction
def _create_observation(self, data, _, transformation_func: Callable, normalised: bool) -> xr.DataArray:
"""
Create observation as ground truth from given data.
Inverse transformation is applied to the ground truth to get the output in the original space.
:param data: observation
:param transformation_func: a callable function to apply inverse transformation
:param normalised: transform ground truth in original space if false, or use normalised predictions if true
:return: filled data array with observation
"""
if not normalised:
data = transformation_func(data, "target", inverse=True)
return data
@staticmethod
def create_fullindex(df: Union[xr.DataArray, pd.DataFrame, pd.DatetimeIndex], freq: str) -> pd.DataFrame:
"""
Create full index from first and last date inside df and resample with given frequency.
:param df: use time range of this data set
:param freq: frequency of full index
:return: empty data frame with full index.
"""
if isinstance(df, pd.DataFrame):
earliest = df.index[0]
latest = df.index[-1]
elif isinstance(df, xr.DataArray):
earliest = df.index[0].values
latest = df.index[-1].values
elif isinstance(df, pd.DatetimeIndex):
earliest = df[0]
latest = df[-1]
else:
raise AttributeError(f"unknown array type. Only pandas dataframes, xarray dataarrays and pandas datetimes "
f"are supported. Given type is {type(df)}.")
index = pd.DataFrame(index=pd.date_range(earliest, latest, freq=freq))
return index
@staticmethod
def create_forecast_arrays(index: pd.DataFrame, ahead_names: List[Union[str, int]], time_dimension,
ahead_dim="ahead", index_dim="index", type_dim="type", **kwargs):
"""
Combine different forecast types into single xarray.
:param index: index for forecasts (e.g. time)
:param ahead_names: names of ahead values (e.g. hours or days)
:param kwargs: as xarrays; data of forecasts
:return: xarray of dimension 3: index, ahead_names, # predictions
"""
keys = list(kwargs.keys())
res = xr.DataArray(np.full((len(index.index), len(ahead_names), len(keys)), np.nan),
coords=[index.index, ahead_names, keys], dims=[index_dim, ahead_dim, type_dim])
for k, v in kwargs.items():
intersection = set(res.index.values) & set(v.indexes[time_dimension].values)
match_index = np.array(list(intersection))
res.loc[match_index, :, k] = v.loc[match_index]
return res
def f_proc_load_data(data):
input_data, target_data = data.get_data(as_numpy=(True, False))
return input_data, target_data, data
def f_proc_make_prediction(data, input_data, target_data, nn_pred, frequency, time_dimension, forecast_indicator, observation_indicator, window_dim,
ahead_dim, index_dim, model_type_dim, forecast_path, subset_type, window_lead_time,
ols_model):
prediction_maker = MakePrediction(nn_pred, window_lead_time, ols_model)
observation_data = data.get_observation()
# get scaling parameters
transformation_func = data.apply_transformation
for normalised in [True, False]:
# create empty arrays
nn_pred, persi_pred, ols_pred, observation = prediction_maker._create_empty_prediction_arrays(target_data,
count=4)
# nn forecast
nn_pred = prediction_maker._create_nn_forecast(input_data, nn_pred, transformation_func, normalised)
# persistence
persi_pred = prediction_maker._create_persistence_forecast(observation_data, persi_pred,
transformation_func, normalised)
# ols
ols_pred = prediction_maker._create_ols_forecast(input_data, ols_pred, transformation_func, normalised)
# observation
observation = prediction_maker._create_observation(target_data, observation, transformation_func, normalised)
# merge all predictions
full_index = prediction_maker.create_fullindex(observation_data.indexes[time_dimension], frequency)
prediction_dict = {forecast_indicator: nn_pred,
"persi": persi_pred,
observation_indicator: observation,
"ols": ols_pred}
all_predictions = prediction_maker.create_forecast_arrays(full_index, list(target_data.indexes[window_dim]),
time_dimension, ahead_dim=ahead_dim,
index_dim=index_dim, type_dim=model_type_dim,
**prediction_dict)
# save all forecasts locally
prefix = "forecasts_norm" if normalised is True else "forecasts"
file = os.path.join(forecast_path, f"{prefix}_{str(data)}_{subset_type}.nc")
all_predictions.to_netcdf(file)
......@@ -9,14 +9,10 @@ keras_nightly==2.5.0.dev2021032900
locket==0.2.1
matplotlib==3.3.4
mock==4.0.3
multiprocess==0.70.11
netcdf4==1.5.8
numpy==1.19.5
pandas==1.1.5
partd==1.2.0
pathos==0.2.7
pox==0.2.9
ppft==1.6.6.3
psutil==5.8.0
pydot==1.4.2
pytest==6.2.2
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment