From 72a33ff1132593d6096beb0b262915d71c859702 Mon Sep 17 00:00:00 2001 From: leufen1 <l.leufen@fz-juelich.de> Date: Mon, 23 Nov 2020 16:58:56 +0100 Subject: [PATCH] missed to include the changes --- mlair/run_modules/pre_processing.py | 33 ++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 1a0f8b86..6548daa5 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -215,16 +215,24 @@ class PreProcessing(RunEnvironment): valid_stations = [] kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name) - logging.info("-------start parallel loop------") - pool = multiprocessing.Pool(4) - output = [pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) - for station in set_stations] - - for p in output: - dh, s = p.get() - if dh is not None: - collection.add(dh) - valid_stations.append(s) + if os.cpu_count() > 1: # parallel solution + logging.info("use parallel validate station approach") + pool = multiprocessing.Pool() + output = [ + pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) + for station in set_stations] + for p in output: + dh, s = p.get() + if dh is not None: + collection.add(dh) + valid_stations.append(s) + else: # serial solution + logging.info("use serial validate station approach") + for station in set_stations: + dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) + if dh is not None: + collection.add(dh) + valid_stations.append(s) logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/" f"{len(set_stations)} valid stations.") @@ -271,6 +279,11 @@ class PreProcessing(RunEnvironment): def f_proc(data_handler, station, name_affix, store, **kwargs): + """ + Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and + therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and + the station that was used. This function must be implemented globally to work together with multiprocessing. + """ try: res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs) -- GitLab