Skip to content
Snippets Groups Projects
Commit 29b7e6c9 authored by leufen1's avatar leufen1
Browse files

try to make failing tests running

parent ed95118c
No related branches found
No related tags found
4 merge requests!432IOA works now also with xarray and on identical data, IOA is included in...,!431Resolve "release v2.1.0",!430update recent developments,!427Resolve "parallel make_prediction in postprocessing"
Pipeline #101672 passed
...@@ -10,7 +10,6 @@ import sys ...@@ -10,7 +10,6 @@ import sys
import traceback import traceback
import pathos import pathos
import multiprocess.context as ctx import multiprocess.context as ctx
ctx._force_start_method('spawn')
import psutil import psutil
from typing import Dict, Tuple, Union, List, Callable from typing import Dict, Tuple, Union, List, Callable
...@@ -699,6 +698,15 @@ class PostProcessing(RunEnvironment): ...@@ -699,6 +698,15 @@ class PostProcessing(RunEnvironment):
logging.info(f"start train_ols_model on train data") logging.info(f"start train_ols_model on train data")
self.ols_model = OrdinaryLeastSquaredModel(self.train_data) self.ols_model = OrdinaryLeastSquaredModel(self.train_data)
@staticmethod
def _get_ctx_context():
_default_context = ctx._default_context
if _default_context._actual_context is None:
_actual_context = _default_context._default_context._name
else:
_actual_context = ctx._default_context._actual_context._name
return _actual_context
@TimeTrackingWrapper @TimeTrackingWrapper
def make_prediction(self, subset): def make_prediction(self, subset):
""" """
...@@ -719,6 +727,8 @@ class PostProcessing(RunEnvironment): ...@@ -719,6 +727,8 @@ class PostProcessing(RunEnvironment):
n_process = min([psutil.cpu_count(logical=False), len(subset), max_process]) # use only physical cpus n_process = min([psutil.cpu_count(logical=False), len(subset), max_process]) # use only physical cpus
if n_process > 1 and use_multiprocessing is True: # parallel solution if n_process > 1 and use_multiprocessing is True: # parallel solution
logging.info("use parallel make prediction approach") logging.info("use parallel make prediction approach")
_actual_context = self._get_ctx_context()
ctx._force_start_method('spawn')
pool = pathos.multiprocessing.ProcessingPool(n_process) pool = pathos.multiprocessing.ProcessingPool(n_process)
logging.info(f"running {getattr(pool, 'ncpus')} processes in parallel") logging.info(f"running {getattr(pool, 'ncpus')} processes in parallel")
output = [ output = [
...@@ -732,6 +742,7 @@ class PostProcessing(RunEnvironment): ...@@ -732,6 +742,7 @@ class PostProcessing(RunEnvironment):
pool.close() pool.close()
pool.join() pool.join()
pool.clear() pool.clear()
ctx._force_start_method(_actual_context)
else: # serial solution else: # serial solution
logging.info("use serial make prediction approach") logging.info("use serial make prediction approach")
for i, data in enumerate(subset): for i, data in enumerate(subset):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment