From 3d35e8e7877f3df7ece3b360bc8f885382351101 Mon Sep 17 00:00:00 2001
From: leufen1 <l.leufen@fz-juelich.de>
Date: Mon, 22 Feb 2021 14:25:53 +0100
Subject: [PATCH] reduce parallel preprocessing to physical cpus

---
 mlair/data_handler/default_data_handler.py | 3 ++-
 mlair/run_modules/pre_processing.py        | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py
index 5a62731d..d85bb4b1 100644
--- a/mlair/data_handler/default_data_handler.py
+++ b/mlair/data_handler/default_data_handler.py
@@ -12,6 +12,7 @@ import shutil
 from functools import reduce
 from typing import Tuple, Union, List
 import multiprocessing
+import psutil
 
 import numpy as np
 import xarray as xr
@@ -270,7 +271,7 @@ class DefaultDataHandler(AbstractDataHandler):
 
         if multiprocessing.cpu_count() > 1:  # parallel solution
             logging.info("use parallel transformation approach")
-            pool = multiprocessing.Pool()
+            pool = multiprocessing.Pool(psutil.cpu_count(logical=False))  # use only physical cpus
             logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
             output = [
                 pool.apply_async(f_proc, args=(cls.data_handler_transformation, station), kwds=sp_keys)
diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py
index c40b20e2..0ca355c9 100644
--- a/mlair/run_modules/pre_processing.py
+++ b/mlair/run_modules/pre_processing.py
@@ -8,6 +8,7 @@ import os
 from typing import Tuple
 import multiprocessing
 import requests
+import psutil
 
 import numpy as np
 import pandas as pd
@@ -264,7 +265,7 @@ class PreProcessing(RunEnvironment):
 
         if multiprocessing.cpu_count() > 1:  # parallel solution
             logging.info("use parallel validate station approach")
-            pool = multiprocessing.Pool()
+            pool = multiprocessing.Pool(psutil.cpu_count(logical=False))  # use only physical cpus
             logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
             output = [
                 pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
-- 
GitLab