diff --git a/HPC_setup/create_runscripts_HPC.sh b/HPC_setup/create_runscripts_HPC.sh index 7bf08a34c6285e31895d817735f319ddde5bfb04..5e37d820ae1241c09c1c87c141bdcf005044a3b7 100755 --- a/HPC_setup/create_runscripts_HPC.sh +++ b/HPC_setup/create_runscripts_HPC.sh @@ -87,7 +87,7 @@ timestamp=\`date +"%Y-%m-%d_%H%M-%S"\` export PYTHONPATH=\${PWD}/venv_${hpcsys}/lib/python3.6/site-packages:\${PYTHONPATH} -srun python run.py --experiment_date=\$timestamp +srun --cpu-bind=none python run.py --experiment_date=\$timestamp EOT echo "Created runscript: run_${hpcsys}_$1.bash" @@ -112,7 +112,7 @@ timestamp=\`date +"%Y-%m-%d_%H%M-%S"\` export PYTHONPATH=\${PWD}/venv_${hpcsys}/lib/python3.6/site-packages:\${PYTHONPATH} -srun python run_HPC.py --experiment_date=\$timestamp +srun --cpu-bind=none python run_HPC.py --experiment_date=\$timestamp EOT fi diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 5a62731de44cdfa24a72cdd0d200ddb561be29c0..52835975101f5ce6881b72b127e16c0e299dfb14 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -12,6 +12,7 @@ import shutil from functools import reduce from typing import Tuple, Union, List import multiprocessing +import psutil import numpy as np import xarray as xr @@ -270,7 +271,8 @@ class DefaultDataHandler(AbstractDataHandler): if multiprocessing.cpu_count() > 1: # parallel solution logging.info("use parallel transformation approach") - pool = multiprocessing.Pool() + pool = multiprocessing.Pool( + min([psutil.cpu_count(logical=False), len(set_stations), 16])) # use only physical cpus logging.info(f"running {getattr(pool, '_processes')} processes in parallel") output = [ pool.apply_async(f_proc, args=(cls.data_handler_transformation, station), kwds=sp_keys) diff --git a/mlair/helpers/statistics.py b/mlair/helpers/statistics.py index ea5a9f05c8ff91a5cd6be678ad03d12b923a4bec..ad6a368fdf7980639802412201e964def80669b2 100644 --- a/mlair/helpers/statistics.py +++ b/mlair/helpers/statistics.py @@ -311,7 +311,6 @@ class SkillScores: for iahead in ahead_names: data = internal_data.sel(ahead=iahead) - external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) skill_score.loc[["CASE I", "AI", "BI", "CI"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=1, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) @@ -319,14 +318,15 @@ class SkillScores: skill_score.loc[["CASE II", "AII", "BII"], iahead] = np.stack(self._climatological_skill_score( data, mu_type=2, forecast_name=forecast_name, observation_name=self.observation_name).values.flatten()) - # if external_data is not None: - skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( - data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, - external_data=external_data).values.flatten()) + if self.external_data is not None: + external_data = self.external_data.sel(ahead=iahead, type=[self.observation_name]) + skill_score.loc[["CASE III", "AIII"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=3, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) - skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( - data, mu_type=4, forecast_name=forecast_name, observation_name=self.observation_name, - external_data=external_data).values.flatten()) + skill_score.loc[["CASE IV", "AIV", "BIV", "CIV"], iahead] = np.stack(self._climatological_skill_score( + data, mu_type=4, forecast_name=forecast_name, observation_name=self.observation_name, + external_data=external_data).values.flatten()) return skill_score diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index c810d3c5517643612f773bdbffc3e0e029d9150d..127066b87cdf507519836ff916681e4885053c09 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -407,7 +407,7 @@ class PostProcessing(RunEnvironment): be found inside `forecast_path`. """ subset_type = subset.name - logging.debug(f"start make_prediction for {subset_type}") + logging.info(f"start make_prediction for {subset_type}") time_dimension = self.data_store.get("time_dim") window_dim = self.data_store.get("window_dim") subset_type = subset.name @@ -627,7 +627,7 @@ class PostProcessing(RunEnvironment): try: file = os.path.join(path, f"forecasts_{str(station)}_train_val.nc") return xr.open_dataarray(file) - except (IndexError, KeyError): + except (IndexError, KeyError, FileNotFoundError): return None def _get_external_data(self, station: str, path: str) -> Union[xr.DataArray, None]: @@ -642,9 +642,20 @@ class PostProcessing(RunEnvironment): try: file = os.path.join(path, f"forecasts_{str(station)}_test.nc") return xr.open_dataarray(file) - except (IndexError, KeyError): + except (IndexError, KeyError, FileNotFoundError): return None + @staticmethod + def _combine_forecasts(forecast, competitor, dim="type"): + """ + Combine forecast and competitor if both are xarray. If competitor is None, this returns forecasts and vise + versa. + """ + try: + return xr.concat([forecast, competitor], dim=dim) + except (TypeError, AttributeError): + return forecast if competitor is None else competitor + def calculate_skill_scores(self) -> Tuple[Dict, Dict]: """ Calculate skill scores of NN forecast. @@ -656,16 +667,20 @@ class PostProcessing(RunEnvironment): :return: competitive and climatological skill scores """ path = self.data_store.get("forecast_path") + all_stations = self.data_store.get("stations") skill_score_competitive = {} skill_score_climatological = {} - for station in self.test_data: + for station in all_stations: external_data = self._get_external_data(station, path) - competitor = self.load_competitors(str(station)) - combined = xr.concat([external_data, competitor], dim="type") if competitor is not None else external_data - skill_score = statistics.SkillScores(combined, models=remove_items(list(combined.type.values), "obs")) - skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) - - internal_data = self._get_internal_data(station, path) # ToDo: check if external is still right? - skill_score_climatological[station] = skill_score.climatological_skill_scores( - internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) + competitor = self.load_competitors(station) + combined = self._combine_forecasts(external_data, competitor, dim="type") + model_list = remove_items(list(combined.type.values), "obs") if combined is not None else None + skill_score = statistics.SkillScores(combined, models=model_list) + if external_data is not None: + skill_score_competitive[station] = skill_score.skill_scores(self.window_lead_time) + + internal_data = self._get_internal_data(station, path) + if internal_data is not None: + skill_score_climatological[station] = skill_score.climatological_skill_scores( + internal_data, self.window_lead_time, forecast_name=self.forecast_indicator) return skill_score_competitive, skill_score_climatological diff --git a/mlair/run_modules/pre_processing.py b/mlair/run_modules/pre_processing.py index f696b0065b1db2692110488bd41513cd74aca233..cdf195e705238252b117955ab1959c4177cbd17a 100644 --- a/mlair/run_modules/pre_processing.py +++ b/mlair/run_modules/pre_processing.py @@ -8,6 +8,7 @@ import os from typing import Tuple import multiprocessing import requests +import psutil import numpy as np import pandas as pd @@ -264,13 +265,15 @@ class PreProcessing(RunEnvironment): if multiprocessing.cpu_count() > 1: # parallel solution logging.info("use parallel validate station approach") - pool = multiprocessing.Pool() + pool = multiprocessing.Pool( + min([psutil.cpu_count(logical=False), len(set_stations), 16])) # use only physical cpus logging.info(f"running {getattr(pool, '_processes')} processes in parallel") output = [ pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) for station in set_stations] - for p in output: + for i, p in enumerate(output): dh, s = p.get() + logging.info(f"...finished: {s} ({int((i + 1.) / len(output) * 100)}%)") if dh is not None: collection.add(dh) valid_stations.append(s) @@ -352,8 +355,7 @@ def f_proc(data_handler, station, name_affix, store, **kwargs): the station that was used. This function must be implemented globally to work together with multiprocessing. """ try: - res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, - **kwargs) + res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs) except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError) as e: logging.info(f"remove station {station} because it raised an error: {e}") res = None diff --git a/requirements.txt b/requirements.txt index a5854fea755d20cc95afd161f587e709a29cfd19..b0a6e7f59896fd0edf08977ee553c803f6c2e960 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,7 @@ patsy==0.5.1 Pillow==8.1.0 pluggy==0.13.1 protobuf==3.15.0 +psutil==5.8.0 py==1.10.0 pydot==1.4.2 pyparsing==2.4.7 diff --git a/requirements_gpu.txt b/requirements_gpu.txt index 809eb0b303a745ae9d68dfb5aa059aeebcf24ac6..35fe0d5ee2a03f01737bc185d2a5bbaf26383806 100644 --- a/requirements_gpu.txt +++ b/requirements_gpu.txt @@ -34,6 +34,7 @@ patsy==0.5.1 Pillow==8.1.0 pluggy==0.13.1 protobuf==3.15.0 +psutil==5.8.0 py==1.10.0 pydot==1.4.2 pyparsing==2.4.7