diff --git a/mlair/data_handler/data_handler_wrf_chem.py b/mlair/data_handler/data_handler_wrf_chem.py index 94dcc9c2b30bd4658d0c29dc7bde913d3f70c3d5..ae1fc9f99491142077396631150fa348313625ef 100644 --- a/mlair/data_handler/data_handler_wrf_chem.py +++ b/mlair/data_handler/data_handler_wrf_chem.py @@ -335,42 +335,53 @@ class BaseWrfChemDataLoader: if self._data is None: raise IOError(f"`open_data' must be called before vector interpolation and rotation") for (u_stag_name, v_stag_name), (u_ll_name, v_ll_name) in self.vars_to_rotate: - u_staged_field = self._data[u_stag_name] - v_staged_field = self._data[v_stag_name] - - u_target_field = self._data[mapping_of_stag2unstag[u_stag_name]] - v_target_field = self._data[mapping_of_stag2unstag[v_stag_name]] - - # This if statements are needed as cdo might have removed empty attrs from variables - if "stagger" not in u_staged_field.attrs: - u_staged_field.attrs['stagger'] = "" - if "stagger" not in v_staged_field.attrs: - v_staged_field.attrs['stagger'] = "" - - u_stagger = u_staged_field.attrs['stagger'] - v_stagger = v_staged_field.attrs['stagger'] - - if u_stagger: - u_grd = self.set_interpolated_field(staged_field=u_staged_field, target_field=u_target_field) - else: - u_grd = self._data[u_stag_name] - - if v_stagger: - v_grd = self.set_interpolated_field(staged_field=v_staged_field, target_field=v_target_field) - else: - v_grd = self._data[v_stag_name] - - if u_ll_name[0] == 'U' and v_ll_name[0] == 'V': - ull, vll = vectorrot.ugrd_vgrd2ull_vll(u_grd, v_grd) - elif u_ll_name[:4] == 'wspd' and v_ll_name[:4] == 'wdir': - ull, vll = vectorrot.ugrd_vgrd2wspd_wdir(u_grd, v_grd) - else: - raise ValueError( - f"`u_ll_name' and `u_ll_name' must either start with 'U' and 'V' or 'wspd' and 'wdir', " - f"but they are u_ll_name={u_ll_name} and v_ll_name={v_ll_name}") - - self._data[u_ll_name] = ull - self._data[v_ll_name] = vll + print((u_stag_name, v_stag_name), (u_ll_name, v_ll_name)) + try: + u_staged_field = self._data[u_stag_name] + v_staged_field = self._data[v_stag_name] + + u_target_field = self._data[mapping_of_stag2unstag[u_stag_name]] + v_target_field = self._data[mapping_of_stag2unstag[v_stag_name]] + + # This if statements are needed as cdo might have removed empty attrs from variables + if "stagger" not in u_staged_field.attrs: + u_staged_field.attrs['stagger'] = "" + if "stagger" not in v_staged_field.attrs: + v_staged_field.attrs['stagger'] = "" + + u_stagger = u_staged_field.attrs['stagger'] + v_stagger = v_staged_field.attrs['stagger'] + + if u_stagger: + u_grd = self.set_interpolated_field(staged_field=u_staged_field, target_field=u_target_field) + else: + u_grd = self._data[u_stag_name] + + if v_stagger: + v_grd = self.set_interpolated_field(staged_field=v_staged_field, target_field=v_target_field) + else: + v_grd = self._data[v_stag_name] + + if v_grd.dims != (self.physical_t_coord_name, self.logical_y_coord_name, self.logical_x_coord_name): + v_grd = v_grd.transpose(self.physical_t_coord_name, self.logical_y_coord_name, + self.logical_x_coord_name) + if u_grd.dims != (self.physical_t_coord_name, self.logical_y_coord_name, self.logical_x_coord_name): + u_grd = u_grd.transpose(self.physical_t_coord_name, self.logical_y_coord_name, + self.logical_x_coord_name) + + if u_ll_name[0] == 'U' and v_ll_name[0] == 'V': + ull, vll = vectorrot.ugrd_vgrd2ull_vll(u_grd, v_grd) + elif u_ll_name[:4] == 'wspd' and v_ll_name[:4] == 'wdir': + ull, vll = vectorrot.ugrd_vgrd2wspd_wdir(u_grd, v_grd) + else: + raise ValueError( + f"`u_ll_name' and `u_ll_name' must either start with 'U' and 'V' or 'wspd' and 'wdir', " + f"but they are u_ll_name={u_ll_name} and v_ll_name={v_ll_name}") + + self._data[u_ll_name] = ull + self._data[v_ll_name] = vll + except KeyError: + pass def set_interpolated_field(self, staged_field: xr.DataArray, target_field: xr.DataArray, dropped_staged_attrs: List[str] =None, **kwargs): @@ -497,21 +508,50 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): if self.external_coords_file is not None: self._apply_external_coordinates() - self.apply_staged_transormation() - self._set_geoinfos() + # self.apply_staged_transormation() + # self._set_geoinfos() if self.vars_for_unit_conv is not None: self.convert_chemical_units() if self.lazy is False: self.reset_data_by_other(self.apply_toarstats()) + self.apply_staged_transormation() else: self.load_lazy() self.store_lazy() + + self._set_geoinfos() # self.rechunk_data(self.rechunk_values) return self + def get_wrfnames_from_statistics_per_var(self): + """ + Returns U and V or U10 and V10 instead of winddir windspd due to averaging. + :return: + :rtype: + """ + working_dict = self.statistics_per_var.copy() + replacer = ((("U10", "V10"), ("wdir10ll", "wspd10ll")), + (("U", "V"), ("wdirll", "wspdll")) + ) + for (u_rep, v_rep), (wdir_org, wspd_org) in replacer: + if wdir_org in self.statistics_per_var.keys(): + # try: + working_dict[u_rep] = self.statistics_per_var[wdir_org] + working_dict[v_rep] = self.statistics_per_var[wdir_org] + working_dict.pop(wdir_org) + if wspd_org in self.statistics_per_var.keys(): + # try: + working_dict.pop(wspd_org) + # except: + # pass + # except: + # pass + + return working_dict + def reset_data_by_other(self, other: xr.Dataset): attrs = self._data.attrs self._data = other @@ -538,9 +578,11 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): except FileNotFoundError: logging.debug(f"{self.station[0]}: could not use lazy data") self.reset_data_by_other(self.apply_toarstats()) + self.apply_staged_transormation() except OSError: logging.debug(f"{self.station[0]}: could not use lazy data") self.reset_data_by_other(self.apply_toarstats()) + self.apply_staged_transormation() def _extract_lazy(self, lazy_data): # _data, self.meta, _input_data, _target_data = lazy_data @@ -728,8 +770,11 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): else: target_time_type_or_zone = self.target_time_type collector = [] - for var in self.statistics_per_var.keys(): - collector.append(self.__toarstats_aggregation(data, target_time_type_or_zone, meta, target_sampling, var)) + + vars_for_toarstats = self.get_wrfnames_from_statistics_per_var() + # for var in self.statistics_per_var.keys(): + for var, var_stats in vars_for_toarstats.items(): + collector.append(self._toarstats_aggregation(data, target_time_type_or_zone, meta, target_sampling, var, var_stats)) sampling_data = xr.merge(collector).dropna(self.physical_t_coord_name) # sampling_data.attrs = data.attrs missing_squeezed_coords = data.coords._names - sampling_data.coords._names @@ -738,8 +783,8 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): return sampling_data - def __toarstats_aggregation(self, data, target_time_type_or_zone, meta, target_sampling, var): - with TimeTracking(name=f"{self.station}: apply toarstats `{self.statistics_per_var[var]}({var})`"): + def _toarstats_aggregation(self, data, target_time_type_or_zone, meta, target_sampling, var, var_stats): + with TimeTracking(name=f"{self.station}: apply toarstats `{var_stats}({var})`"): spatial_dims = list(remove_items(data[var].dims, self.physical_t_coord_name)) try: df = data[var].to_dataframe()[[var]].reset_index(level=spatial_dims) @@ -750,13 +795,16 @@ class SingleGridColumnWrfChemDataLoader(BaseWrfChemDataLoader): df = df[[var] + spatial_dims] df = self.set_external_time_zone_and_convert_to_target_time(df, target_time_type_or_zone) df = df.groupby(spatial_dims) - df = df.apply(self._toarstats_wrapper, sampling=target_sampling, statistics=self.statistics_per_var[var], + df = df.apply(self._toarstats_wrapper, sampling=target_sampling, statistics=var_stats, metadata=(meta[self.physical_y_coord_name], meta[self.physical_x_coord_name])) df.columns = [var] df.index.set_names(df.index.names[:len(spatial_dims)] + [self.physical_t_coord_name], inplace=True) # df = df.to_xarray().to_array(self.target_dim) df = df.to_xarray() - df = df.chunk({self.logical_x_coord_name: -1}) + try: + df = df.chunk({self.logical_x_coord_name: -1}) + except ValueError: + df = df.chunk({self.logical_y_coord_name: -1}) # collector.append(df) return df diff --git a/mlair/run_modules/post_processing.py b/mlair/run_modules/post_processing.py index 4149f9a1b3aa74139a35dc6e8ba85de88f06725c..5985db43fcd35491122de9739de9f607c0e3909e 100644 --- a/mlair/run_modules/post_processing.py +++ b/mlair/run_modules/post_processing.py @@ -991,12 +991,17 @@ class PostProcessing(RunEnvironment): external_data.coords[self.model_type_dim] = [ {self.forecast_indicator: self.model_display_name}.get(n, n) for n in external_data.coords[self.model_type_dim].values] - external_data_expd = external_data.assign_coords({self.iter_dim: station}) - external_data_expd = external_data_expd.expand_dims(self.iter_dim).to_dataset(self.iter_dim) - external_data_expd.to_netcdf(os.path.join(path, f"forecasts_ds_{str(station)}_test.nc")) + competitor = self.load_competitors(station) + combined = self._combine_forecasts(external_data, competitor, dim=self.model_type_dim) + combined_expd = combined.assign_coords({self.iter_dim: station}) + combined_expd = combined_expd.expand_dims(self.iter_dim).to_dataset(self.iter_dim) + combined_expd.to_netcdf(os.path.join(path, f"forecasts_ds_{str(station)}_test.nc")) + # external_data_expd = external_data.assign_coords({self.iter_dim: station}) + # external_data_expd = external_data_expd.expand_dims(self.iter_dim).to_dataset(self.iter_dim) + # external_data_expd.to_netcdf(os.path.join(path, f"forecasts_ds_{str(station)}_test.nc")) @TimeTrackingWrapper - def calculate_error_metrics_based_on_upstream_wind_dir(self, ref_name: str = ["ols", "persi"]) -> xr.DataArray: + def calculate_error_metrics_based_on_upstream_wind_dir(self, ref_name: Union[str, List[str]] = None) -> xr.DataArray: """ Calculates the error metrics (mse)/skill scores based on the wind sector at time t0. @@ -1004,6 +1009,8 @@ class PostProcessing(RunEnvironment): :return: :rtype: """ + if ref_name is None: + ref_name = "ols" self.load_forecast_array_and_store_as_dataset() path = self.data_store.get("forecast_path") files = os.path.join(path, "forecasts_ds_*_test.nc")