diff --git a/mlair/data_handler/data_handler_wrf_chem.py b/mlair/data_handler/data_handler_wrf_chem.py index 092b9ff67144166253bfa5ae8ab2802835ec8ecd..a93761b844c966fac04708b3e8cca4dbeab15a00 100644 --- a/mlair/data_handler/data_handler_wrf_chem.py +++ b/mlair/data_handler/data_handler_wrf_chem.py @@ -5,6 +5,7 @@ import xarray as xr import numpy as np import itertools import matplotlib.pyplot as plt +from dask.diagnostics import ProgressBar import dask import inspect @@ -370,7 +371,9 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): @TimeTrackingWrapper def _set_nearest_icoords(self, dim=None): lat, lon = self.get_coordinates(as_arrays=True) - self._nearest_icoords = dask.compute(self.compute_nearest_icoordinates(lat, lon, dim))[0] + with ProgressBar(): + logging.info("SingleGridColumnWrfChemDataLoader: compute nearest icoordinates") + self._nearest_icoords = dask.compute(self.compute_nearest_icoordinates(lat, lon, dim))[0] def get_nearest_icoords(self, as_arrays=False): if as_arrays: @@ -488,7 +491,9 @@ class DataHandlerSingleGridColumn(DataHandlerSingleStation): # transpose dataarray: set first three fixed and keep remaining as is data = data.transpose(self.iter_dim, self.time_dim, self.target_dim, ...) - data = dask.compute(self._slice_prep(data, start=start, end=end))[0] + with ProgressBar(): + logging.info("DataHandlerSingleGridColumn.load_data: compute data for slice_prep") + data = dask.compute(self._slice_prep(data, start=start, end=end))[0] # ToDo # data should somehow look like this: # < xarray.DataArray(Stations: 1, datetime: 7670, variables: 9) (From DataHandlerSingleStation) @@ -696,12 +701,8 @@ class DataHandlerSectorGrid(DataHandlerSingleGridColumn): # setup sector history sector_history = xr.ones_like(self.history) sector_history_var_names = [f"{var}_sect" for var in sector_history.coords[self.target_dim].values] - # sector_history = sector_history.assign_coords({self.target_dim: sector_history_var_names}) grid_data = self.preselect_and_transform_neighbouring_data_based_on_radius(loader) - # grid_data = grid_data.expand_dims(self.iter_dim, -1).assign_coords( - # {self.iter_dim: self.history.coords[self.iter_dim].values}) - # sec_data_history_var_names = [f"{var}_sect" for var in self.history.coords[self.target_dim].values] logging.info("preselect_and_transform_neighbouring_data_based_on_radius(loader)") for sect in existing_sectors: # select data in wind sector @@ -718,17 +719,11 @@ class DataHandlerSectorGrid(DataHandlerSingleGridColumn): sector_history * 1.) sector_history = sector_history.assign_coords({self.target_dim: sector_history_var_names}) - sector_history = sector_history.compute() - combined_history = xr.concat([self.history, sector_history], dim=self.target_dim) + with ProgressBar(): + logging.info(f"compute `sector_history' for modify_history") + sector_history = sector_history.compute() - # loader.data.T2.where(loader.geo_infos.dist.sel({'points': 0}).drop('points') <= self.radius).where( - # self.windsector.is_in_sector(sect, loader.geo_infos.bearing)) - # - # loader.data[self.variables].sel( - # {self.time_dim: self.history.coords[self.time_dim].values}).where( - # loader.geo_infos.dist.sel({'points': 0}).drop('points') <= self.radius).where( - # self.windsector.is_in_sector(sect, - # loader.geo_infos.bearing.drop('points').squeeze())) + combined_history = xr.concat([self.history, sector_history], dim=self.target_dim) return combined_history else: diff --git a/mlair/data_handler/default_data_handler.py b/mlair/data_handler/default_data_handler.py index 66f6b5b0b64c1126237880c2a722487254a71d21..6580341e7c88bad55f40186ded7e5c5eb3c7b892 100644 --- a/mlair/data_handler/default_data_handler.py +++ b/mlair/data_handler/default_data_handler.py @@ -15,6 +15,7 @@ from typing import Tuple, Union, List import multiprocessing import psutil import dask +from dask.diagnostics import ProgressBar import numpy as np import xarray as xr @@ -100,7 +101,9 @@ class DefaultDataHandler(AbstractDataHandler): @staticmethod def _force_dask_computation(data): try: - data = dask.compute(data)[0] + with ProgressBar(): + logging.info(f"DefaultDH: _force_dask_computation") + data = dask.compute(data)[0] except: logging.info("can't execute dask.compute") return data