diff --git a/downscaling_ap5/HPC_batch_scripts/train_unet_model_template_e4.sh b/downscaling_ap5/HPC_batch_scripts/train_unet_template_e4.sh similarity index 88% rename from downscaling_ap5/HPC_batch_scripts/train_unet_model_template_e4.sh rename to downscaling_ap5/HPC_batch_scripts/train_unet_template_e4.sh index 3b0718cfde2cef65982e29e343878a9d34d75ab6..5b5539788018e32e2380a14a36cc0d3a2321c690 100644 --- a/downscaling_ap5/HPC_batch_scripts/train_unet_model_template_e4.sh +++ b/downscaling_ap5/HPC_batch_scripts/train_unet_template_e4.sh @@ -9,8 +9,8 @@ #SBATCH --gres=gpu:1 ##SBATCH --mem=40G #SBATCH --time=01:00:00 -#SBATCH --output=train_wgan-model-out.%j -#SBATCH --error=train_wgan-model-err.%j +#SBATCH --output=train_wgan-out.%j +#SBATCH --error=train_wgan-err.%j ######### Template identifier (don't remove) ######### echo "Do not run the template scripts" @@ -47,9 +47,9 @@ export PYTHONPATH=${BASE_DIR}/models:$PYTHONPATH export PYTHONPATH=${BASE_DIR}/postprocess:$PYTHONPATH echo ${PYTHONPATH} -# data-directories -# Note template uses Tier2-dataset. Adapt accordingly for other datasets. -indir=/data/maelstrom/langguth1/tier2/train +# data-directories +# Adapt accordingly to your dataset +indir=<my_input_dir> outdir=${BASE_DIR}/trained_models/ js_model_conf=${BASE_DIR}/config/config_unet.json js_ds_conf=${BASE_DIR}/config/config_ds_tier2.json diff --git a/downscaling_ap5/HPC_batch_scripts/train_unet_model_template_jsc.sh b/downscaling_ap5/HPC_batch_scripts/train_unet_template_jsc.sh similarity index 83% rename from downscaling_ap5/HPC_batch_scripts/train_unet_model_template_jsc.sh rename to downscaling_ap5/HPC_batch_scripts/train_unet_template_jsc.sh index 7142b664561a493294719e2b8c6f4809665420a4..f14969a7d76fd576961d8ed070870e4ac8a6d63c 100644 --- a/downscaling_ap5/HPC_batch_scripts/train_unet_model_template_jsc.sh +++ b/downscaling_ap5/HPC_batch_scripts/train_unet_template_jsc.sh @@ -4,9 +4,10 @@ #SBATCH --ntasks=1 ##SBATCH --ntasks-per-node=1 #SBATCH --cpus-per-task=48 -#SBATCH --output=train_unet-model-out.%j -#SBATCH --error=train_unet-model-err.%j +#SBATCH --output=train_unet-out.%j +#SBATCH --error=train_unet-err.%j #SBATCH --time=02:00:00 +##SBATCH --time=20:00:00 #SBATCH --gres=gpu:1 ##SBATCH --partition=batch ##SBATCH --partition=gpus @@ -21,8 +22,9 @@ echo "Do not run the template scripts" exit 99 ######### Template identifier (don't remove) ######### -# pin CPUs (needed for Slurm>22.05) -export SRUN_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} +# environmental variables to support cpus_per_task with Slurm>22.05 +export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} +export SRUN_CPUS_PER_TASK="${SLURM_CPUS_PER_TASK}" # basic directories WORK_DIR=$(pwd) @@ -46,7 +48,8 @@ if [ -z ${VIRTUAL_ENV} ]; then fi -# data-directories +# data-directories +# Adapt accordingly to your dataset indir=<training_data_dir> outdir=${BASE_DIR}/trained_models/ js_model_conf=${BASE_DIR}/config/config_unet.json diff --git a/downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_e4.sh b/downscaling_ap5/HPC_batch_scripts/train_wgan_template_e4.sh similarity index 86% rename from downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_e4.sh rename to downscaling_ap5/HPC_batch_scripts/train_wgan_template_e4.sh index 52669e97bee835e7d2c6eba654926779945cd02d..d370c29f896bef32948c1d9e02c02950dcc6c492 100644 --- a/downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_e4.sh +++ b/downscaling_ap5/HPC_batch_scripts/train_wgan_template_e4.sh @@ -9,8 +9,8 @@ #SBATCH --gres=gpu:1 ##SBATCH --mem=40G #SBATCH --time=01:00:00 -#SBATCH --output=train_wgan-model-out.%j -#SBATCH --error=train_wgan-model-err.%j +#SBATCH --output=train_wgan-out.%j +#SBATCH --error=train_wgan-err.%j ######### Template identifier (don't remove) ######### echo "Do not run the template scripts" @@ -47,10 +47,10 @@ export PYTHONPATH=${BASE_DIR}/models:$PYTHONPATH export PYTHONPATH=${BASE_DIR}/postprocess:$PYTHONPATH echo ${PYTHONPATH} -# data-directories -# Note template uses Tier2-dataset. Adapt accordingly for other datasets. -indir=/data/maelstrom/langguth1/tier2/train -outdir=${BASE_DIR}/tranied_models/ +# data-directories +# Adapt accordingly to your dataset +indir=<my_input_dir> +outdir=${BASE_DIR}/trained_models/ js_model_conf=${BASE_DIR}/config/config_wgan.json js_ds_conf=${BASE_DIR}/config/config_ds_tier2.json diff --git a/downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_jsc.sh b/downscaling_ap5/HPC_batch_scripts/train_wgan_template_jsc.sh similarity index 83% rename from downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_jsc.sh rename to downscaling_ap5/HPC_batch_scripts/train_wgan_template_jsc.sh index ac3986d62f941799865cdc65d4720cf3e0e11c21..cef8bee4a413dc8df53e9ec04347add8a0ffbd7f 100644 --- a/downscaling_ap5/HPC_batch_scripts/train_wgan_model_template_jsc.sh +++ b/downscaling_ap5/HPC_batch_scripts/train_wgan_template_jsc.sh @@ -4,9 +4,10 @@ #SBATCH --ntasks=1 ##SBATCH --ntasks-per-node=1 #SBATCH --cpus-per-task=48 -#SBATCH --output=train_wgan-model-out.%j -#SBATCH --error=train_wgan-model-err.%j +#SBATCH --output=train_wgan-out.%j +#SBATCH --error=train_wgan-err.%j #SBATCH --time=02:00:00 +##SBATCH --time=20:00:00 #SBATCH --gres=gpu:1 ##SBATCH --partition=batch ##SBATCH --partition=gpus @@ -21,8 +22,9 @@ echo "Do not run the template scripts" exit 99 ######### Template identifier (don't remove) ######### -# pin CPUs (needed for Slurm>22.05) -export SRUN_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} +# environmental variables to support cpus_per_task with Slurm>22.05 +export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK} +export SRUN_CPUS_PER_TASK="${SLURM_CPUS_PER_TASK}" # basic directories WORK_DIR=$(pwd) @@ -46,7 +48,8 @@ if [ -z ${VIRTUAL_ENV} ]; then fi -# data-directories +# data-directories +# Adapt accordingly to your dataset indir=<training_data_dir> outdir=${BASE_DIR}/trained_models/ js_model_conf=${BASE_DIR}/config/config_wgan.json diff --git a/downscaling_ap5/config/config_ds_tier2_wind.json b/downscaling_ap5/config/config_ds_tier2_wind.json new file mode 100644 index 0000000000000000000000000000000000000000..9591a95ef1502d85659e42f285a8ff5b2bce4ef7 --- /dev/null +++ b/downscaling_ap5/config/config_ds_tier2_wind.json @@ -0,0 +1,6 @@ +{ + "norm_dims": ["time", "rlat", "rlon"], + "batch_size": 32, + "var_tar2in": "hsurf_tar", + "predictands": ["u_10m_tar", "v_10m_tar", "hsurf_tar"] +} diff --git a/downscaling_ap5/handle_data/handle_data_class.py b/downscaling_ap5/handle_data/handle_data_class.py index 9a9f5ab293574ec52929a208ee42e18c5416e3c7..4849de9bf506ca5e3b46d9e892d1ef38b56d765e 100644 --- a/downscaling_ap5/handle_data/handle_data_class.py +++ b/downscaling_ap5/handle_data/handle_data_class.py @@ -663,8 +663,6 @@ class StreamMonthlyNetCDF(object): # free memory free_mem([ds_add, add_samples, istart]) - # free memory - free_mem([nsamples]) self.data_loaded[il] = data_now # timing t_read = timer() - t0 @@ -672,6 +670,8 @@ class StreamMonthlyNetCDF(object): self.ds_proc_size += data_now.nbytes print(f"Dataset #{set_ind:d} ({il+1:d}/2) reading time: {t_read:.2f}s.") self.iload_next = il + 1 + # free memory + free_mem([nsamples, t_read, data_now]) return il diff --git a/downscaling_ap5/main_scripts/main_postprocess.py b/downscaling_ap5/main_scripts/main_postprocess.py index 2a921e9da7aafa13ec343fec21697c68792273eb..0e60fbd7fbf19a05c6e0c88cb65bc4b66cceca43 100644 --- a/downscaling_ap5/main_scripts/main_postprocess.py +++ b/downscaling_ap5/main_scripts/main_postprocess.py @@ -42,8 +42,11 @@ def main(parser_args): model_dir, plt_dir, norm_dir, model_type = get_model_info(model_base, parser_args.output_base_dir, parser_args.exp_name, parser_args.last, parser_args.model_type) - # create logger handlers + + # create output-directory and set name of netCDF-file to store inference data os.makedirs(plt_dir, exist_ok=True) + ncfile_out = os.path.join(plt_dir, "postprocessed_ds_test.nc") + # create logger handlers logfile = os.path.join(plt_dir, f"postprocessing_{parser_args.exp_name}.log") if os.path.isfile(logfile): os.remove(logfile) fh = logging.FileHandler(logfile) @@ -138,12 +141,15 @@ def main(parser_args): # perform denormalization y_pred = norm.denormalize(y_pred.squeeze(), varname=tar_varname) + # write inference data to netCDf + logger.info(f"Write inference data to netCDF-file '{ncfile_out}'") + ground_truth.name, y_pred.name = f"{tar_varname}_ref", f"{tar_varname}_fcst" + ds = xr.Dataset(xr.Dataset.merge(y_pred.to_dataset(), ground_truth.to_dataset())) + ds.to_netcdf(ncfile_out) + # start evaluation logger.info(f"Output data on test dataset successfully processed in {timer()-t0_train:.2f}s. Start evaluation...") - # create plot directory if required - os.makedirs(plt_dir, exist_ok=True) - # instantiate score engine for time evaluation (i.e. hourly time series of evalutaion metrics) score_engine = Scores(y_pred, ground_truth, ds_dict["norm_dims"][1:]) diff --git a/downscaling_ap5/main_scripts/main_train.py b/downscaling_ap5/main_scripts/main_train.py index ec0db870502d0f25795e528c9ad0a8407abc32a1..66970190c0e00b826422ada7c7ae0ec09ff5b4a4 100644 --- a/downscaling_ap5/main_scripts/main_train.py +++ b/downscaling_ap5/main_scripts/main_train.py @@ -82,6 +82,7 @@ def main(parser_args): # training data print("Start preparing training data...") t0_train = timer() + varnames_tar = list(ds_dict["predictands"]) fname_or_patt_train = get_dataset_filename(datadir, dataset, "train", ds_dict.get("laugmented", False)) # if fname_or_patt_train is a filename (string without wildcard), all data will be loaded into memory @@ -96,7 +97,6 @@ def main(parser_args): norm_obj=data_norm, norm_dims=norm_dims) data_norm = ds_obj.data_norm nsamples, shape_in = ds_obj.nsamples, (*ds_obj.data_dim[::-1], ds_obj.n_predictors) - varnames_tar = list(ds_obj.predictand_list) if named_targets else None tfds_train_size = ds_obj.dataset_size else: ds_train = xr.open_dataset(fname_or_patt_train) @@ -112,7 +112,6 @@ def main(parser_args): var_tar2in=ds_dict["var_tar2in"], named_targets=named_targets) nsamples, shape_in = da_train.shape[0], tfds_train.element_spec[0].shape[1:].as_list() - varnames_tar = list(tfds_train.element_spec[1].keys()) if named_targets else None tfds_train_size = da_train.nbytes if write_norm: @@ -154,6 +153,12 @@ def main(parser_args): compile_opts = handle_opt_utils(model, "get_compile_opts") model.compile(**compile_opts) + # copy configuration and normalization JSON-file to model-directory (incl. renaming) + filelist, filelist_new = [parser_args.conf_ds.name, parser_args.conf_md.name], [f"config_ds_{dataset}.json", f"config_{parser_args.model}.json"] + if not write_norm: + filelist.append(js_norm), filelist_new.append(os.path.basename(js_norm)) + copy_filelist(filelist, model_savedir, filelist_new) + # train model time_tracker = TimeHistory() steps_per_epoch = int(np.ceil(nsamples / ds_dict["batch_size"])) @@ -183,10 +188,10 @@ def main(parser_args): model.save(filepath=model_savedir) if callable(getattr(model, "plot_model", False)): - model.plot_model(model_savedir, show_shapes=True, show_layer_activations=True) + model.plot_model(model_savedir, show_shapes=True) else: plot_model(model, os.path.join(model_savedir, f"plot_{parser_args.exp_name}.png"), - show_shapes=True, show_layer_activations=True) + show_shapes=True) # final timing tend = timer() @@ -230,11 +235,6 @@ def main(parser_args): print("Finished job at {0}".format(dt.strftime(dt.now(), "%Y-%m-%d %H:%M:%S"))) - # copy configuration and normalization JSON-file to output-directory - filelist = [parser_args.conf_ds.name, parser_args.conf_md.name] - if not write_norm: filelist.append(js_norm) - copy_filelist(filelist, model_savedir) - if __name__ == "__main__": parser = argparse.ArgumentParser() diff --git a/downscaling_ap5/models/unet_model.py b/downscaling_ap5/models/unet_model.py index 48bc8e3b065479384d4b29aa993d26d622ab9bd8..60ee0ffdc30929bd822cd352fe5f747e8c002655 100644 --- a/downscaling_ap5/models/unet_model.py +++ b/downscaling_ap5/models/unet_model.py @@ -9,7 +9,7 @@ Methods to set-up U-net models incl. its building blocks. __author__ = "Michael Langguth" __email__ = "m.langguth@fz-juelich.de" __date__ = "2021-XX-XX" -__update__ = "2022-11-25" +__update__ = "2023-05-06" # import modules import os @@ -115,12 +115,13 @@ def decoder_block(inputs, skip_features, num_filters, kernel: tuple = (3, 3), st # The particular U-net -def sha_unet(input_shape: tuple, channels_start: int = 56, z_branch: bool = False, +def sha_unet(input_shape: tuple, n_predictands_dyn: int, channels_start: int = 56, z_branch: bool = False, concat_out: bool = False, tar_channels=["output_dyn", "output_z"]) -> Model: """ Builds up U-net model architecture adapted from Sha et al., 2020 (see https://doi.org/10.1175/JAMC-D-20-0057.1). :param input_shape: shape of input-data :param channels_start: number of channels to use as start in encoder blocks + :param n_predictands: number of target variables (dynamic output variables) :param z_branch: flag if z-branch is used. :param tar_channels: name of output/target channels (needed for associating losses during compilation) :param concat_out: boolean if output layers will be concatenated (disables named target channels!) @@ -141,7 +142,7 @@ def sha_unet(input_shape: tuple, channels_start: int = 56, z_branch: bool = Fals d2 = decoder_block(d1, s2, channels_start * 2) d3 = decoder_block(d2, s1, channels_start) - output_dyn = Conv2D(1, (1, 1), kernel_initializer="he_normal", name=tar_channels[0])(d3) + output_dyn = Conv2D(n_predictands_dyn, (1, 1), kernel_initializer="he_normal", name=tar_channels[0])(d3) if z_branch: print("Use z_branch...") output_static = Conv2D(1, (1, 1), kernel_initializer="he_normal", name=tar_channels[1])(d3) @@ -161,7 +162,7 @@ class UNET(keras.Model): U-Net submodel class: This subclass takes a U-Net implemented using Keras functional API as input to the instanciation. """ - def __init__(self, unet_model: keras.Model, shape_in: List, varnames_tar, hparams: dict, savedir: str, + def __init__(self, unet_model: keras.Model, shape_in: List, varnames_tar: List, hparams: dict, savedir: str, exp_name: str = "unet_model"): super(UNET, self).__init__() @@ -170,6 +171,8 @@ class UNET(keras.Model): self.shape_in = shape_in self.varnames_tar = varnames_tar self.hparams = UNET.get_hparams_dict(hparams) + self.n_predictands = len(varnames_tar) # number of predictands + self.n_predictands_dyn = self.n_predictands - 1 if self.hparams["z_branch"] else self.n_predictands if self.hparams["l_embed"]: raise ValueError("Embedding is not implemented yet.") self.modelname = exp_name @@ -208,11 +211,12 @@ class UNET(keras.Model): def compile(self, **kwargs): # instantiate model - if self.varnames_tar: + if self.hparams["z_branch"]: # model has named branches (see also opt_dict in get_compile_opts) tar_channels = [f"{var}" for var in self.varnames_tar] - self.unet = self.unet(self.shape_in, z_branch=self.hparams["z_branch"], tar_channels=tar_channels) + self.unet = self.unet(self.shape_in, self.n_predictands_dyn, z_branch=True, + concat_out= False, tar_channels=tar_channels) else: - self.unet = self.unet(self.shape_in, z_branch=self.hparams["z_branch"], concat_out=True) + self.unet = self.unet(self.shape_in, self.n_predictands_dyn, z_branch=False) return self.unet.compile(**kwargs) # return super(UNET, self).compile(**kwargs) @@ -311,7 +315,7 @@ class UNET(keras.Model): hparams_dict = {"batch_size": 32, "lr": 5.e-05, "nepochs": 70, "z_branch": True, "loss_func": "mae", "loss_weights": [1.0, 1.0], "lr_decay": False, "decay_start": 5, "decay_end": 30, "lr_end": 1.e-06, "l_embed": False, "ngf": 56, "optimizer": "adam", "lscheduled_train": True, - "var_tar2in": ""} + "var_tar2in": "", "n_predictands": 1} return hparams_dict diff --git a/downscaling_ap5/models/wgan_model.py b/downscaling_ap5/models/wgan_model.py index ec60885d190518dfb190b346797b0d4dcfe47dfe..1e14df96c255b6fb96798099b3f2ba0176ad3eec 100644 --- a/downscaling_ap5/models/wgan_model.py +++ b/downscaling_ap5/models/wgan_model.py @@ -70,13 +70,14 @@ class WGAN(keras.Model): Class for Wassterstein GAN models """ - def __init__(self, generator: keras.Model, critic: keras.Model, shape_in: List, varnames_tar: List, hparams: dict, savedir: str, - exp_name: str = "wgan_model"): + def __init__(self, generator: keras.Model, critic: keras.Model, shape_in: List, varnames_tar: List, hparams: dict, + savedir: str, exp_name: str = "wgan_model"): """ Initiate Wasserstein GAN model :param generator: A generator model returning a data field :param critic: A critic model which returns a critic scalar on the data field :param shape_in: shape of input data + :param varnames_tar: list of target variables/predictands (incl. static variables, e.g. for z_branch) :param hparams: dictionary of hyperparameters :param exp_name: name of the WGAN experiment :param savedir: directory where checkpointed model will be saved @@ -89,16 +90,19 @@ class WGAN(keras.Model): self.hparams = WGAN.get_hparams_dict(hparams) if self.hparams["l_embed"]: raise ValueError("Embedding is not implemented yet.") + self.n_predictands = len(varnames_tar) # number of predictands + self.n_predictands_dyn = self.n_predictands - 1 if self.hparams["z_branch"] else self.n_predictands + print(f"Dynamic predictands: {self.n_predictands_dyn}, Predictands: {self.n_predictands}") self.modelname = exp_name if not os.path.isdir(savedir): os.makedirs(savedir, exist_ok=True) self.savedir = savedir # instantiate submodels - tar_shape = (*self.shape_in[:-1], 1) # critic only accounts for 1st channel (should be the downscaling target) - # instantiate models - self.generator = self.generator(self.shape_in, channels_start=self.hparams["ngf"], - z_branch=self.hparams["z_branch"], concat_out=True) + # instantiate model components (generator and critci) + self.generator = self.generator(self.shape_in, self.n_predictands, channels_start=self.hparams["ngf"], + concat_out=True, z_branch=self.hparams["z_branch"]) + tar_shape = (*self.shape_in[:-1], self.n_predictands_dyn) # critic only accounts for dynamic predictands self.critic = self.critic(tar_shape) # Unused attribute, but introduced for joint driver script with U-Net; to be solved with customized target vars @@ -191,16 +195,19 @@ class WGAN(keras.Model): for i in range(self.hparams["d_steps"]): with tf.GradientTape() as tape_critic: ist, ie = i * self.hparams["batch_size"], (i + 1) * self.hparams["batch_size"] - # critic only operates on first channel - predictands_critic = tf.expand_dims(predictands[ist:ie, :, :, 0], axis=-1) + # critic only operates on predictand channels + if self.n_predictands_dyn > 1: + predictands_critic = predictands[ist:ie, :, :, 0:self.n_predictands_dyn] + else: + predictands_critic = tf.expand_dims(predictands[ist:ie, :, :, 0], axis=-1) # generate (downscaled) data gen_data = self.generator(predictors[ist:ie, ...], training=True) # calculate critics for both, the real and the generated data - critic_gen = self.critic(gen_data[..., 0], training=True) + critic_gen = self.critic(gen_data[..., 0:self.n_predictands_dyn], training=True) critic_gt = self.critic(predictands_critic, training=True) # calculate the loss (incl. gradient penalty) c_loss = WGAN.critic_loss(critic_gt, critic_gen) - gp = self.gradient_penalty(predictands_critic, gen_data[..., 0:1]) + gp = self.gradient_penalty(predictands_critic, gen_data[..., 0:self.n_predictands_dyn]) d_loss = c_loss + self.hparams["gp_weight"] * gp @@ -213,7 +220,7 @@ class WGAN(keras.Model): # generate (downscaled) data gen_data = self.generator(predictors[-self.hparams["batch_size"]:, :, :, :], training=True) # get the critic and calculate corresponding generator losses (critic and reconstruction loss) - critic_gen = self.critic(gen_data[..., 0], training=True) + critic_gen = self.critic(gen_data[..., 0:self.n_predictands_dyn], training=True) cg_loss = WGAN.critic_gen_loss(critic_gen) rloss = self.recon_loss(predictands[-self.hparams["batch_size"]:, :, :, :], gen_data) @@ -253,7 +260,7 @@ class WGAN(keras.Model): :return: gradient penalty """ # get mixture of generated and ground truth data - alpha = tf.random.normal([self.hparams["batch_size"], 1, 1, 1], 0., 1.) + alpha = tf.random.normal([self.hparams["batch_size"], 1, 1, self.n_predictands_dyn], 0., 1.) mix_data = real_data + alpha * (gen_data - real_data) with tf.GradientTape() as gp_tape: @@ -271,12 +278,8 @@ class WGAN(keras.Model): def recon_loss(self, real_data, gen_data): # initialize reconstruction loss rloss = 0. - # get number of output heads (=2 if z_branch is activated) - n = 1 - if self.hparams["z_branch"]: - n = 2 # get MAE for all output heads - for i in range(n): + for i in range(self.hparams["n_predictands"]): rloss += tf.reduce_mean(tf.abs(tf.squeeze(gen_data[..., i]) - real_data[..., i])) return rloss @@ -372,7 +375,7 @@ class WGAN(keras.Model): hparams_dict = {"batch_size": 32, "lr_gen": 1.e-05, "lr_critic": 1.e-06, "nepochs": 50, "z_branch": False, "lr_decay": False, "decay_start": 5, "decay_end": 10, "lr_gen_end": 1.e-06, "l_embed": False, "ngf": 56, "d_steps": 5, "recon_weight": 1000., "gp_weight": 10., "optimizer": "adam", - "lscheduled_train": True, "var_tar2in": ""} + "lscheduled_train": True, "var_tar2in": "", "n_predictands": 2} return hparams_dict diff --git a/downscaling_ap5/postprocess/postprocess.py b/downscaling_ap5/postprocess/postprocess.py index 1f33315df118a86af976a4a26a46454ec3f0b7d0..bac9c6ab6ce2ca33c88344537ed499a5d8fb883c 100644 --- a/downscaling_ap5/postprocess/postprocess.py +++ b/downscaling_ap5/postprocess/postprocess.py @@ -36,11 +36,11 @@ def get_model_info(model_base, output_base: str, exp_name: str, bool_last: bool os.path.join(output_base, model_name) norm_dir = model_base model_type = "wgan" - elif "unet" in exp_name: + elif "unet" in exp_name or "deepru" in exp_name: func_logger.debug(f"U-Net-modeltype detected.") model_dir, plt_dir = model_base, os.path.join(output_base, model_name) norm_dir = model_dir - model_type = "unet" + model_type = "unet" if "unet" in exp_name else "deepru" else: func_logger.debug(f"Model type could not be inferred from experiment name. Try my best by defaulting...") if not model_type: diff --git a/downscaling_ap5/test_scripts/demo_tfdataset_ap5.py b/downscaling_ap5/test_scripts/demo_tfdataset_ap5.py index 3c073a53d6c5f6cc71b3e4863f8d30afe42d264b..f01e84b2225229096c6339be7eed67354a6e904b 100644 --- a/downscaling_ap5/test_scripts/demo_tfdataset_ap5.py +++ b/downscaling_ap5/test_scripts/demo_tfdataset_ap5.py @@ -1,4 +1,3 @@ -<<<<<<< HEAD # SPDX-FileCopyrightText: 2023 Earth System Data Exploration (ESDE), Jülich Supercomputing Center (JSC) # # SPDX-License-Identifier: MIT @@ -74,381 +73,6 @@ def main(): # some statistics on memory usage print_gpu_usage("Final GPU memory: ") print_cpu_usage("Final CPU memory: ") - - -if __name__ == "__main__": - main() - -======= -import os, sys, glob -import argparse -from typing import List, Union -from operator import itemgetter -from functools import partial -import re -import gc -import random -from timeit import default_timer as timer -import numpy as np -import xarray as xr -import dask -from multiprocessing import Pool as ThreadPool -import tensorflow as tf -import tensorflow.keras as keras - -da_or_ds = Union[xr.DataArray, xr.Dataset] - -def main(): - parser = argparse.ArgumentParser("Program that test the MAELSTROM AP5 data pipeline") - parser.add_argument("--datadir", "-d", dest="datadir", type=str, - default="/p/scratch/deepacf/maelstrom/maelstrom_data/ap5_michael/preprocessed_tier2/monthly_files_copy/", - help="Directory where monthly netCDF-files are stored") - parser.add_argument("--file_pattern", "-f", dest="file_patt", type=str, default="downscaling_tier2_train_*.nc", help="Filename pattern to glob netCDF-files") - parser.add_argument("--nfiles_load", "-n", default=30, type=int, dest="nfiles_load", - help="Number of netCDF-files to load into memory (2x with prefetching).") - parser.add_argument("--lprefetch", "-lprefetch", dest="lprefetch", action="store_true", - help="Enable prefetching.") - parser.add_argument("--batch_size", "-b", dest="batch_size", default=192, type=int, - help="Batch size for TF dataset.") - args = parser.parse_args() - - # get data handler - ds_obj = StreamMonthlyNetCDF(args.datadir, args.file_patt, nfiles_merge=args.nfiles_load, - norm_dims=["time", "rlat", "rlon"]) - - # set-up TF dataset - ## map-funcs - tf_read_nc = lambda ind_set: tf.py_function(ds_obj.read_netcdf, [ind_set], tf.int64) - tf_choose_data = lambda il: tf.py_function(ds_obj.choose_data, [il], tf.bool) - tf_getdata = lambda i: tf.numpy_function(ds_obj.getitems, [i], tf.float32) - tf_split = lambda arr: (arr[..., 0:-ds_obj.n_predictands], arr[..., -ds_obj.n_predictands:]) - - ## process chain - tfds = tf.data.Dataset.range(int(ds_obj.nfiles_merged*6*10)).map(tf_read_nc) # 6*10 corresponds to (d_steps + 1)*n_epochs with d_steps=5, n_epochs=10 - if args.lprefetch: - tfds = tfds.prefetch(1) # .prefetch(1) ensures that one data subset (=n files) is prefetched - tfds = tfds.flat_map(lambda x: tf.data.Dataset.from_tensors(x).map(tf_choose_data)) - tfds = tfds.flat_map( - lambda x: tf.data.Dataset.range(ds_obj.samples_merged).shuffle(ds_obj.samples_merged) - .batch(args.batch_size, drop_remainder=True).map(tf_getdata)) - - tfds = tfds.prefetch(int(ds_obj.samples_merged)) - tfds = tfds.map(tf_split).repeat() - - t0 = timer() - for i, x in enumerate(tfds): - if i == int(ds_obj.nsamples/args.batch_size) - 1: - break - - print(f"Processing one epoch of data lasted {timer() - t0:.1f} seconds.") - - -class ZScore(object): - """ - Class for performing zscore-normalization and denormalization. - Also computes normalization parameters from data if necessary. - """ - def __init__(self, norm_dims: List): - self.method = "zscore" - self.norm_dims = norm_dims - self.norm_stats = {"mu": None, "sigma": None} - - def get_required_stats(self, data: da_or_ds, **stats): - """ - Get required parameters for z-score normalization. They are either computed from the data - or can be parsed as keyword arguments. - :param data: the data to be (de-)normalized - :param stats: keyword arguments for mean (mu) and standard deviation (std) used for normalization - :return (mu, sigma): Parameters for normalization - """ - mu, std = stats.get("mu", self.norm_stats["mu"]), stats.get("sigma", self.norm_stats["sigma"]) - - if mu is None or std is None: - print("Retrieve mu and sigma from data...") - mu, std = data.mean(self.norm_dims), data.std(self.norm_dims) - # The following ensure that both parameters are computed in one graph! - # This significantly reduces memory footprint as we don't end up having data duplicates - # in memory due to multiple graphs (and also seem to enfore usage of data chunks as well) - mu, std = dask.compute(mu, std) - self.norm_stats = {"mu": mu, "sigma": std} - # else: - # print("Mu and sigma are parsed for (de-)normalization.") - - return mu, std - - def normalize(self, data, **stats): - """ - Perform z-score normalization on data. - Either computes the normalization parameters from the data or applies pre-existing ones. - :param data: Data array of interest - :param mu: mean of data for normalization - :param std: standard deviation of data for normalization - :return data_norm: normalized data - """ - mu, std = self.get_required_stats(data, **stats) - data_norm = (data - mu) / std - - return data_norm - - def denormalize(self, data, **stats): - """ - Perform z-score denormalization on data. - :param data: Data array of interest - :param mu: mean of data for denormalization - :param std: standard deviation of data for denormalization - :return data_norm: denormalized data - """ - if self.norm_stats["mu"] is None or self.norm_stats["std"] is None: - raise ValueError("Normalization parameters mu and std are unknown.") - else: - norm_stats = self.get_required_stats(data, **stats) - - - data_denorm = data * norm_stats["std"] + norm_stats["mu"] - - return data_denorm - - -class StreamMonthlyNetCDF(object): - """ - Data handler for monthly netCDF-files which provides methods for setting up - a TF dataset that is too large to fit into memory. - """ - def __init__(self, datadir, patt: str, nfiles_merge: int, sample_dim: str = "time", selected_predictors: List = None, - selected_predictands: List = None, var_tar2in: str = None, norm_dims: List = None, norm_obj=None): - """ - Initialize data handler. - :param datadir: path to directory where netCDF-files are located - :param patt: file name pattern for globbing - :param nfiles_merge: number of netCDF-files that get loaded into memory - :param sample_dim: dimension from which samples will be drawn - :param selected_predictors: list of predictor variable names - :param selected_predictands: list of predictand variable names - :param var_tar2in: target variable that should be added to input as well (e.g. surface topography) - :param norm_dims: dimenions over which data will be normalized - :param norm_obj: normalization object - """ - self.data_dir = datadir - self.file_list = patt - self.nfiles = len(self.file_list) - self.file_list_random = random.sample(self.file_list, self.nfiles) - self.nfiles2merge = nfiles_merge - self.nfiles_merged = int(self.nfiles / self.nfiles2merge) - self.samples_merged = self.get_samples_per_merged_file() - self.predictor_list = selected_predictors - self.predictand_list = selected_predictands - self.n_predictands, self.n_predictors = len(self.predictand_list), len(self.predictor_list) - self.all_vars = self.predictor_list + self.predictand_list - self.ds_all = xr.open_mfdataset(list(self.file_list), decode_cf=False, cache=False) # , parallel=True) - self.var_tar2in = var_tar2in - if self.var_tar2in is not None: - self.n_predictors += len(to_list(self.var_tar2in)) - self.sample_dim = sample_dim - self.nsamples = self.ds_all.dims[sample_dim] - self.data_dim = self.get_data_dim() - t0 = timer() - self.normalization_time = -999. - if norm_obj is None: - print("Start computing normalization parameters.") - self.data_norm = ZScore(norm_dims) # TO-DO: Allow for arbitrary normalization - self.norm_params = self.data_norm.get_required_stats(self.ds_all) - self.normalization_time = timer() - t0 - else: - self.data_norm = norm_obj - self.norm_params = norm_obj.norm_stats - - self.data_loaded = [xr.Dataset, xr.Dataset] - self.i_loaded = 0 - self.data_now = None - self.pool = ThreadPool(10) # To-Do: remove hard-coded number of threads (-> support contact) - - def __len__(self): - return self.nsamples - - def getitems(self, indices): - da_now = self.data_now.isel({self.sample_dim: indices}).to_array("variables") - if self.var_tar2in is not None: - da_now = xr.concat([da_now, da_now.sel({"variables": self.var_tar2in})], dim="variables") - return da_now.transpose(..., "variables") - - def get_data_dim(self): - """ - Retrieve the dimensionality of the data to be handled, i.e. without sample_dim which will be batched in a - data stream. - :return: tuple of data dimensions - """ - # get existing dimension names and remove sample_dim - dimnames = list(self.ds_all.coords) - dimnames.remove(self.sample_dim) - - # get the dimensionality of the data of interest - all_dims = dict(self.ds_all.dims) - data_dim = itemgetter(*dimnames)(all_dims) - - return data_dim - - def get_samples_per_merged_file(self): - nsamples_merged = [] - - for i in range(self.nfiles_merged): - file_list_now = self.file_list_random[i * self.nfiles2merge: (i + 1) * self.nfiles2merge] - ds_now = xr.open_mfdataset(list(file_list_now), decode_cf=False) - nsamples_merged.append(ds_now.dims["time"]) # To-Do avoid hard-coding - - return max(nsamples_merged) - - @property - def data_dir(self): - return self._data_dir - - @data_dir.setter - def data_dir(self, datadir): - if not os.path.isdir(datadir): - raise NotADirectoryError(f"Parsed data directory '{datadir}' does not exist.") - - self._data_dir = datadir - - @property - def file_list(self): - return self._file_list - - @file_list.setter - def file_list(self, patt): - patt = patt if patt.endswith(".nc") else f"{patt}.nc" - files = glob.glob(os.path.join(self.data_dir, patt)) - - if not files: - raise FileNotFoundError(f"Could not find any files with pattern '{patt}' under '{self.data_dir}'.") - - self._file_list = list( - np.asarray(sorted(files, key=lambda s: int(re.search(r'\d+', os.path.basename(s)).group())))) - - @property - def nfiles2merge(self): - return self._nfiles2merge - - @nfiles2merge.setter - def nfiles2merge(self, n2merge): - #n = find_closest_divisor(self.nfiles, n2merge) - #if n != n2merge: - # print(f"{n2merge} is not a divisor of the total number of files. Value is changed to {n}") - if self.nfiles%n2merge != 0: - raise ValueError(f"Chosen number of files ({n2merge:d}) to read must be a divisor " + - f" of total number of files ({self.nfiles:d}).") - - self._nfiles2merge = n2merge - - @property - def sample_dim(self): - return self._sample_dim - - @sample_dim.setter - def sample_dim(self, sample_dim): - if not sample_dim in self.ds_all.dims: - raise KeyError(f"Could not find dimension '{sample_dim}' in data.") - - self._sample_dim = sample_dim - - @property - def predictor_list(self): - return self._predictor_list - - @predictor_list.setter - def predictor_list(self, selected_predictors: List): - """ - Initalizes predictor list. In case that selected_predictors is set to None, all variables with suffix `_in` in their names are selected. - In case that a list of selected_predictors is parsed, their availability is checked. - :param selected_predictors: list of predictor variables or None - """ - self._predictor_list = self.check_and_choose_vars(selected_predictors, "_in") - - @property - def predictand_list(self): - return self._predictand_list - - @predictand_list.setter - def predictand_list(self, selected_predictands: List): - self._predictand_list = self.check_and_choose_vars(selected_predictands, "_tar") - - def check_and_choose_vars(self, var_list: List[str], suffix: str = "*"): - """ - Checks list of variables for availability or retrieves all variables named with a given suffix (for var_list = None) - :param var_list: list of predictor variables or None - :param suffix: optional suffix of variables to selected. Only effective if var_list is None. - :return selected_vars: list of selected variables - """ - ds_test = xr.open_dataset(self.file_list[0]) - all_vars = list(ds_test.variables) - - if var_list is None: - selected_vars = [var for var in all_vars if var.endswith(suffix)] - else: - stat_list = [var in all_vars for var in var_list] - if all(stat_list): - selected_vars = var_list - else: - miss_inds = [i for i, x in enumerate(stat_list) if x] - miss_vars = [var_list[i] for i in miss_inds] - raise ValueError(f"Could not find the following variables in the dataset: {*miss_vars,}") - - return selected_vars - - @staticmethod - def _process_one_netcdf(fname, data_norm, engine: str = "netcdf4", var_list: List = None, **kwargs): - with xr.open_dataset(fname, decode_cf=False, engine=engine, **kwargs) as ds_now: - if var_list: ds_now = ds_now[var_list] - ds_now = StreamMonthlyNetCDF._preprocess_ds(ds_now, data_norm) - ds_now = ds_now.load() - return ds_now - - @staticmethod - def _preprocess_ds(ds, data_norm): - ds = data_norm.normalize(ds) - return ds.astype("float32") - - def _read_mfdataset(self, files, **kwargs): - t0 = timer() - # parallel processing of files incl. normalization - datasets = self.pool.map(partial(self._process_one_netcdf, data_norm=self.data_norm, **kwargs), files) - ds_all = xr.concat(datasets, dim=self.sample_dim) - # clean-up - del datasets - gc.collect() - # timing - print(f"Reading dataset of {len(files)} files took {timer() - t0:.2f}s.") - - return ds_all - - def read_netcdf(self, set_ind): - set_ind = tf.keras.backend.get_value(set_ind) - set_ind = int(str(set_ind).lstrip("b'").rstrip("'")) - set_ind = int(set_ind%self.nfiles_merged) - il = int((self.i_loaded + 1)%2) - file_list_now = self.file_list_random[set_ind * self.nfiles2merge:(set_ind + 1) * self.nfiles2merge] - # read the normalized data into memory - # ds_now = xr.open_mfdataset(list(file_list_now), decode_cf=False, data_vars=self.all_vars, - # preprocess=partial(self._preprocess_ds, data_norm=self.data_norm), - # parallel=True).load() - self.data_loaded[il] = self._read_mfdataset(file_list_now, var_list=self.all_vars).copy() - nsamples = self.data_loaded[il].dims[self.sample_dim] - if nsamples < self.samples_merged: - t0 = timer() - add_samples = self.samples_merged - nsamples - add_inds = random.sample(range(nsamples), add_samples) - ds_add = self.data_loaded[il].isel({self.sample_dim: add_inds}) - ds_add[self.sample_dim] = ds_add[self.sample_dim] + 1. - self.data_loaded[il] = xr.concat([self.data_loaded[il], ds_add], dim=self.sample_dim) - print(f"Appending data with {add_samples:d} samples took {timer() - t0:.2f}s.") - - print(f"Currently loaded dataset has {self.data_loaded[il].dims[self.sample_dim]} samples.") - - return il - - def choose_data(self, il): - self.data_now = self.data_loaded[il] - self.i_loaded = il - return True if __name__ == "__main__": main() ->>>>>>> develop diff --git a/downscaling_ap5/utils/other_utils.py b/downscaling_ap5/utils/other_utils.py index 5cd200181e56a00410a6e5a5b200089e5ea408b3..2853df1e88789774b1821e482a81df47f9443ab5 100644 --- a/downscaling_ap5/utils/other_utils.py +++ b/downscaling_ap5/utils/other_utils.py @@ -337,22 +337,30 @@ def get_max_memory_usage(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1000 -def copy_filelist(file_list: List, dest: str, labort: bool = True): +def copy_filelist(file_list: List, dest_dir: str, file_list_dest: List = None ,labort: bool = True): """ Copy a list of files to another directory :param file_list: list of files to copy - :param dest: target directory to which files will be copied + :param dest_dir: target directory to which files will be copied :param labort: flag to trigger raising of an error (if False, only Warning-messages will be printed) """ file_list = to_list(file_list) - if not os.path.isdir(dest) and labort: + if not os.path.isdir(dest_dir) and labort: raise NotADirectoryError(f"Cannot copy to non-existing directory '{dest}'.") - elif not os.path.isdir(dest) and not labort: + elif not os.path.isdir(dest_dir) and not labort: print(f"WARNING: Target directory for copying '{dest}' does not exist. Skip copy process...") + return - for f in file_list: + if file_list_dest is None: + dest = dest_dir + else: + assert len(file_list) == len(file_list_dest), f"Length of filelist to copy ({len(file_list)})" + \ + f" and of filelist at destination ({len(file_list_dest)}) differ." + dest = [os.path.join(dest_dir, f_dest) for f_dest in file_list_dest] + + for i, f in enumerate(file_list): if os.path.isfile(f): - shutil.copy(f, dest) + shutil.copy(f, dest[i]) else: if labort: raise FileNotFoundError(f"Could not find file '{f}'. Error will be raised.")