From 9806b13636a70fbb464581c496f24e30177ad703 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Tue, 30 Mar 2021 14:57:29 +0200 Subject: [PATCH] default data handler and preprocessing support parameter use_multiprocessing. Parameter is set to False on DEBUG always. /close #297 on pipeline success --- mlair/configuration/defaults.py | 1 + mlair/data_handler/default_data_handler.py | 6 ++++-- mlair/run_modules/experiment_setup.py | 13 +++++++++++-- mlair/run_modules/pre_processing.py | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/mlair/configuration/defaults.py b/mlair/configuration/defaults.py index 04e441fe..8805acfc 100644 --- a/mlair/configuration/defaults.py +++ b/mlair/configuration/defaults.py @@ -53,6 +53,7 @@ DEFAULT_SAMPLING = "daily" DEFAULT_DATA_ORIGIN = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", "no": "", "no2": "", "o3": "", "pm10": "", "so2": ""} +DEFAULT_USE_MULTIPROCESSING = True def get_defaults(): diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 2eceff32..87fc83b0 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -39,7 +39,8 @@ class DefaultDataHandler(AbstractDataHandler): def __init__(self, id_class: data_handler, experiment_path: str, min_length: int = 0, extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None, - store_processed_data=True, iter_dim=DEFAULT_ITER_DIM, time_dim=DEFAULT_TIME_DIM): + store_processed_data=True, iter_dim=DEFAULT_ITER_DIM, time_dim=DEFAULT_TIME_DIM, + use_multiprocessing=True): super().__init__() self.id_class = id_class self.time_dim = time_dim @@ -49,6 +50,7 @@ class DefaultDataHandler(AbstractDataHandler): self._Y = None self._X_extreme = None self._Y_extreme = None + self._use_multiprocessing = use_multiprocessing _name_affix = str(f"{str(self.id_class)}_{name_affix}" if name_affix is not None else id(self)) self._save_file = os.path.join(experiment_path, "data", f"{_name_affix}.pickle") self._collection = self._create_collection() @@ -286,7 +288,7 @@ class DefaultDataHandler(AbstractDataHandler): new = opts.get(k) transformation_dict[i][var][k] = new if old is None else old.combine_first(new) - if multiprocessing.cpu_count() > 1: # parallel solution + if multiprocessing.cpu_count() > 1 and kwargs.get("use_multiprocessing", True) is True: # parallel solution logging.info("use parallel transformation approach") pool = multiprocessing.Pool( min([psutil.cpu_count(logical=False), len(set_stations), 16])) # use only physical cpus diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index 30672ecc..f51cee8a 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -4,6 +4,7 @@ __date__ = '2019-11-15' import argparse import logging import os +import sys from typing import Union, Dict, Any, List, Callable from mlair.configuration import path_config @@ -17,7 +18,8 @@ from mlair.configuration.defaults import DEFAULT_STATIONS, DEFAULT_VAR_ALL_DICT, DEFAULT_TRAIN_START, DEFAULT_TRAIN_END, DEFAULT_TRAIN_MIN_LENGTH, DEFAULT_VAL_START, DEFAULT_VAL_END, \ DEFAULT_VAL_MIN_LENGTH, DEFAULT_TEST_START, DEFAULT_TEST_END, DEFAULT_TEST_MIN_LENGTH, DEFAULT_TRAIN_VAL_MIN_LENGTH, \ DEFAULT_USE_ALL_STATIONS_ON_ALL_DATA_SETS, DEFAULT_EVALUATE_BOOTSTRAPS, DEFAULT_CREATE_NEW_BOOTSTRAPS, \ - DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN, DEFAULT_ITER_DIM + DEFAULT_NUMBER_OF_BOOTSTRAPS, DEFAULT_PLOT_LIST, DEFAULT_SAMPLING, DEFAULT_DATA_ORIGIN, DEFAULT_ITER_DIM, \ + DEFAULT_USE_MULTIPROCESSING from mlair.data_handler import DefaultDataHandler from mlair.run_modules.run_environment import RunEnvironment from mlair.model_modules.fully_connected_networks import FCN_64_32_16 as VanillaModel @@ -228,7 +230,8 @@ class ExperimentSetup(RunEnvironment): number_of_bootstraps=None, create_new_bootstraps=None, data_path: str = None, batch_path: str = None, login_nodes=None, hpc_hosts=None, model=None, batch_size=None, epochs=None, data_handler=None, - data_origin: Dict = None, competitors: list = None, competitor_path: str = None, **kwargs): + data_origin: Dict = None, competitors: list = None, competitor_path: str = None, + use_multiprocessing: bool = None, **kwargs): # create run framework super().__init__() @@ -265,6 +268,12 @@ class ExperimentSetup(RunEnvironment): logging.info(f"Experiment path is: {experiment_path}") path_config.check_path_and_create(self.data_store.get("experiment_path")) + # host system setup + debug_mode = sys.gettrace() is not None + self._set_param("debug_mode", debug_mode) + use_multiprocessing = False if debug_mode is True else use_multiprocessing + self._set_param("use_multiprocessing", use_multiprocessing, default=DEFAULT_USE_MULTIPROCESSING) + # batch path (temporary) self._set_param("batch_path", batch_path, default=os.path.join(experiment_path, "batch_data")) diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 813873b8..f59a4e89 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -241,8 +241,9 @@ class PreProcessing(RunEnvironment): collection = DataCollection(name=set_name) valid_stations = [] kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name) + use_multiprocessing = self.data_store.get("use_multiprocessing") - if multiprocessing.cpu_count() > 1: # parallel solution + if multiprocessing.cpu_count() > 1 and use_multiprocessing: # parallel solution logging.info("use parallel validate station approach") pool = multiprocessing.Pool( min([psutil.cpu_count(logical=False), len(set_stations), 16])) # use only physical cpus -- GitLab