diff --git a/mlair/configuration/path_config.py b/mlair/configuration/path_config.py index e7418b984dab74b0527b8dca05a9f6c3636ac18f..6b9c799ceb190b9150be3a4cfcd336eaf45aa768 100644 --- a/mlair/configuration/path_config.py +++ b/mlair/configuration/path_config.py @@ -3,6 +3,7 @@ import getpass import logging import os import re +import shutil import socket from typing import Union @@ -112,17 +113,23 @@ def set_bootstrap_path(bootstrap_path: str, data_path: str) -> str: return os.path.abspath(bootstrap_path) -def check_path_and_create(path: str) -> None: +def check_path_and_create(path: str, remove_existing: bool = False) -> None: """ Check a given path and create if not existing. :param path: path to check and create + :param remove_existing: if set to true an existing folder is removed and replaced by a new one (default False). """ try: os.makedirs(path) logging.debug(f"Created path: {path}") except FileExistsError: - logging.debug(f"Path already exists: {path}") + if remove_existing is True: + logging.debug(f"Remove / clean path: {path}") + shutil.rmtree(path) + check_path_and_create(path, remove_existing=False) + else: + logging.debug(f"Path already exists: {path}") def get_host(): diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 3157248f03a0f72dbbbda5885f1c3e082a82b4c1..03858f8ba0e72bfdb186711757e94bcc08680301 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -8,6 +8,7 @@ import gc import logging import os import pickle +import random import dill import shutil from functools import reduce @@ -248,7 +249,7 @@ class DefaultDataHandler(AbstractDataHandler): d.coords[dim].values += np.timedelta64(*timedelta) @classmethod - def transformation(cls, set_stations, **kwargs): + def transformation(cls, set_stations, tmp_path=None, **kwargs): """ ### supported transformation methods @@ -309,15 +310,20 @@ class DefaultDataHandler(AbstractDataHandler): logging.info("use parallel transformation approach") pool = multiprocessing.Pool(n_process) # use only physical cpus logging.info(f"running {getattr(pool, '_processes')} processes in parallel") + sp_keys.update({"tmp_path": tmp_path, "return_strategy": "reference"}) output = [ pool.apply_async(f_proc, args=(cls.data_handler_transformation, station), kwds=sp_keys) for station in set_stations] for p in output: - dh, s = p.get() + _res_file, s = p.get() + with open(_res_file, "rb") as f: + dh = dill.load(f) + os.remove(_res_file) _inner() pool.close() else: # serial solution logging.info("use serial transformation approach") + sp_keys.update({"return_strategy": "result"}) for station in set_stations: dh, s = f_proc(cls.data_handler_transformation, station, **sp_keys) _inner() @@ -348,15 +354,22 @@ class DefaultDataHandler(AbstractDataHandler): return self.id_class.get_coordinates() -def f_proc(data_handler, station, **sp_keys): +def f_proc(data_handler, station, return_strategy="", tmp_path=None, **sp_keys): """ Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and therefore f_proc will return None as indication. On a successful build, f_proc returns the built data handler and the station that was used. This function must be implemented globally to work together with multiprocessing. """ + assert return_strategy in ["result", "reference"] try: res = data_handler(station, **sp_keys) except (AttributeError, EmptyQueryResult, KeyError, ValueError) as e: logging.info(f"remove station {station} because it raised an error: {e}") res = None - return res, station + if return_strategy == "result": + return res, station + else: + _tmp_file = os.path.join(tmp_path, f"{station}_{'%032x' % random.getrandbits(128)}.pickle") + with open(os.path.join(tmp_path, _tmp_file), "wb") as f: + dill.dump(res, f) + return _tmp_file, station diff --git a/mlair/run_modules/experiment_setup.py b/mlair/run_modules/experiment_setup.py index 209859c1ff38efe2667c918aa5b79c96f2524be0..b11a2a33417cc27fa23122e31faa089a2dea321c 100644 --- a/mlair/run_modules/experiment_setup.py +++ b/mlair/run_modules/experiment_setup.py @@ -287,6 +287,10 @@ class ExperimentSetup(RunEnvironment): self._set_param("logging_path", None, os.path.join(experiment_path, "logging")) path_config.check_path_and_create(self.data_store.get("logging_path")) + # set tmp path + self._set_param("tmp_path", None, os.path.join(experiment_path, "tmp")) + path_config.check_path_and_create(self.data_store.get("tmp_path"), remove_existing=True) + # setup for data self._set_param("stations", stations, default=DEFAULT_STATIONS, apply=helpers.to_list) self._set_param("statistics_per_var", statistics_per_var, default=DEFAULT_VAR_ALL_DICT) diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index 791b8b7b08d2a4267e22a2a7b38873c9d70f0fc1..e3793651d99bf1229de4114a43c260ae534c9e37 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -10,6 +10,8 @@ from typing import Tuple import multiprocessing import requests import psutil +import random +import dill import pandas as pd @@ -242,6 +244,7 @@ class PreProcessing(RunEnvironment): valid_stations = [] kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope=set_name) use_multiprocessing = self.data_store.get("use_multiprocessing") + tmp_path = self.data_store.get("tmp_path") max_process = self.data_store.get("max_number_multiprocessing") n_process = min([psutil.cpu_count(logical=False), len(set_stations), max_process]) # use only physical cpus @@ -249,18 +252,23 @@ class PreProcessing(RunEnvironment): logging.info("use parallel validate station approach") pool = multiprocessing.Pool(n_process) logging.info(f"running {getattr(pool, '_processes')} processes in parallel") + kwargs.update({"tmp_path": tmp_path, "return_strategy": "reference"}) output = [ pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) for station in set_stations] for i, p in enumerate(output): - dh, s = p.get() + _res_file, s = p.get() logging.info(f"...finished: {s} ({int((i + 1.) / len(output) * 100)}%)") + with open(_res_file, "rb") as f: + dh = dill.load(f) + os.remove(_res_file) if dh is not None: collection.add(dh) valid_stations.append(s) pool.close() else: # serial solution logging.info("use serial validate station approach") + kwargs.update({"return_strategy": "result"}) for station in set_stations: dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) if dh is not None: @@ -288,7 +296,8 @@ class PreProcessing(RunEnvironment): def transformation(self, data_handler: AbstractDataHandler, stations): if hasattr(data_handler, "transformation"): kwargs = self.data_store.create_args_dict(data_handler.requirements(), scope="train") - transformation_dict = data_handler.transformation(stations, **kwargs) + tmp_path = self.data_store.get_default("tmp_path", default=None) + transformation_dict = data_handler.transformation(stations, tmp_path=tmp_path, **kwargs) if transformation_dict is not None: self.data_store.set("transformation", transformation_dict) @@ -312,12 +321,13 @@ class PreProcessing(RunEnvironment): logging.info("No preparation required because no competitor was provided to the workflow.") -def f_proc(data_handler, station, name_affix, store, **kwargs): +def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_path=None, **kwargs): """ Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and therefore f_proc will return None as indication. On a successfull build, f_proc returns the built data handler and the station that was used. This function must be implemented globally to work together with multiprocessing. """ + assert return_strategy in ["result", "reference"] try: res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs) except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError) as e: @@ -326,7 +336,15 @@ def f_proc(data_handler, station, name_affix, store, **kwargs): f"remove station {station} because it raised an error: {e} -> {' | '.join(f_inspect_error(formatted_lines))}") logging.debug(f"detailed information for removal of station {station}: {traceback.format_exc()}") res = None - return res, station + if return_strategy == "result": + return res, station + else: + if tmp_path is None: + tmp_path = os.getcwd() + _tmp_file = os.path.join(tmp_path, f"{station}_{'%032x' % random.getrandbits(128)}.pickle") + with open(os.path.join(tmp_path, _tmp_file), "wb") as f: + dill.dump(res, f) + return _tmp_file, station def f_inspect_error(formatted):