Skip to content
Snippets Groups Projects
Commit 31e10e8f authored by felix kleinert's avatar felix kleinert
Browse files

Merge branch 'felix_isstue288_tech-dacompute-dump' into 'develop'

Resolve "DefaultDataHandler: dask.compute() before dump to file"

Closes #288

See merge request toar/mlair!263
parents 3d3f497f e8e8aa69
Branches
Tags
6 merge requests!319add all changes of dev into release v1.4.0 branch,!318Resolve "release v1.4.0",!283Merge latest develop into falcos issue,!264Merge develop into felix_issue287_tech-wrf-datahandler-should-inherit-from-singlestationdatahandler,!263Resolve "DefaultDataHandler: dask.compute() before dump to file",!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #62008 passed
This commit is part of merge request !318. Comments created here will be created in the context of that merge request.
...@@ -13,6 +13,7 @@ from functools import reduce ...@@ -13,6 +13,7 @@ from functools import reduce
from typing import Tuple, Union, List from typing import Tuple, Union, List
import multiprocessing import multiprocessing
import psutil import psutil
import dask
import numpy as np import numpy as np
import xarray as xr import xarray as xr
...@@ -83,11 +84,20 @@ class DefaultDataHandler(AbstractDataHandler): ...@@ -83,11 +84,20 @@ class DefaultDataHandler(AbstractDataHandler):
if store_processed_data is True: if store_processed_data is True:
self._cleanup() if fresh_store is True else None self._cleanup() if fresh_store is True else None
data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme} data = {"X": self._X, "Y": self._Y, "X_extreme": self._X_extreme, "Y_extreme": self._Y_extreme}
data = self._force_dask_computation(data)
with open(self._save_file, "wb") as f: with open(self._save_file, "wb") as f:
pickle.dump(data, f) pickle.dump(data, f)
logging.debug(f"save pickle data to {self._save_file}") logging.debug(f"save pickle data to {self._save_file}")
self._reset_data() self._reset_data()
@staticmethod
def _force_dask_computation(data):
try:
data = dask.compute(data)[0]
except:
pass
return data
def _load(self): def _load(self):
try: try:
with open(self._save_file, "rb") as f: with open(self._save_file, "rb") as f:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment