diff --git a/video_prediction_savp/HPC_scripts/DataExtraction.sh b/video_prediction_savp/HPC_scripts/DataExtraction.sh index 5c5efb489397d05fa08ddd1021734cecbc16b517..3d868b9cc923e9e0bcd0d1e1e24e69b9e8705a4f 100755 --- a/video_prediction_savp/HPC_scripts/DataExtraction.sh +++ b/video_prediction_savp/HPC_scripts/DataExtraction.sh @@ -40,6 +40,7 @@ year="2010" # Run data extraction srun python ../../workflow_parallel_frame_prediction/DataExtraction/mpi_stager_v2.py --source_dir ${source_dir}/${year} --destination_dir ${dest_dir}/${year}/ -# 2tier pystager -#srun python ../../workflow_parallel_frame_prediction/DataExtraction/main_single_master.py --source_dir ${source_dir}/${year}/ --destination_dir ${dest_dir}/${year}/ + +# 2tier pystager +#srun python ../../workflow_parallel_frame_prediction/DataExtraction/main_single_master.py --source_dir /p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/${year}/ --destination_dir ${SAVE_DIR}/extractedData/${year} diff --git a/video_prediction_savp/HPC_scripts/DataPreprocess.sh b/video_prediction_savp/HPC_scripts/DataPreprocess.sh index 420c7884577434e66a38661bac5ea346884858ce..a9bd46413abaca1d2df3670b0acbcc8d32c919a6 100755 --- a/video_prediction_savp/HPC_scripts/DataPreprocess.sh +++ b/video_prediction_savp/HPC_scripts/DataPreprocess.sh @@ -19,8 +19,8 @@ fi source ../env_setup/modules_preprocess.sh -source_dir=/p/scratch/deepacf/video_prediction_shared_folder/extractedData -destination_dir=/p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12 +source_dir=${SAVE_DIR}/extractedData +destination_dir=${SAVE_DIR}/preprocessedData/era5-Y2015to2017M01to12 script_dir=`pwd` declare -a years=("2222" diff --git a/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh b/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh index 273863179bdfb673ff79991708aa69fc415b8997..83b61e6fbbfc1ef34f6421ef130be71384ffa47a 100755 --- a/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh +++ b/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh @@ -6,8 +6,8 @@ #SBATCH --cpus-per-task=1 #SBATCH --output=DataPreprocess_to_tf-out.%j #SBATCH --error=DataPreprocess_to_tf-err.%j -#SBATCH --time=00:20:00 -#SBATCH --partition=devel +#SBATCH --time=01:20:00 +#SBATCH --partition=batch #SBATCH --mail-type=ALL #SBATCH --mail-user=b.gong@fz-juelich.de diff --git a/video_prediction_savp/HPC_scripts/generate_era5.sh b/video_prediction_savp/HPC_scripts/generate_era5.sh index 15eefec36b15e26c98b1a78e93cab1e8d8317c8d..3d5658bde3f942eee220f25aa13d8e2814beba12 100755 --- a/video_prediction_savp/HPC_scripts/generate_era5.sh +++ b/video_prediction_savp/HPC_scripts/generate_era5.sh @@ -34,11 +34,12 @@ source_dir=/p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/er checkpoint_dir=/p/scratch/deepacf/video_prediction_shared_folder/models/era5-Y2009Y2012to2013Y2016Y2019M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2010Y2022M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2010Y2022M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500/ results_dir=/p/scratch/deepacf/video_prediction_shared_folder/results/era5-Y2009Y2012to2013Y2016Y2019M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2010Y2022M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2010Y2022M01to12-160x128-2970N1500W-T2_MSL_gph500/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500/ -model=mcnet + + # run postprocessing/generation of model results including evaluation metrics srun python -u ../scripts/generate_transfer_learning_finetune.py \ ---input_dir ${source_dir}/tfrecords --dataset_hparams sequence_length=20 --checkpoint ${checkpoint_dir}/${model} \ ---mode test --results_dir ${results_dir} --batch_size 2 --dataset era5 > generate_era5-out.out +--input_dir ${source_dir}/tfrecords --dataset_hparams sequence_length=20 --checkpoint ${checkpoint_dir}/${model}/${hyperdir} \ +--mode test --results_dir ${results_dir}/${model} --batch_size 2 --dataset era5 > generate_era5-out.out #srun python scripts/train.py --input_dir data/era5 --dataset era5 --model savp --model_hparams_dict hparams/kth/ours_savp/model_hparams.json --output_dir logs/era5/ours_savp diff --git a/video_prediction_savp/HPC_scripts/hyperparam_setup.sh b/video_prediction_savp/HPC_scripts/hyperparam_setup.sh new file mode 100644 index 0000000000000000000000000000000000000000..a6c24a062ca30b06879641806d771beacc4b34f8 --- /dev/null +++ b/video_prediction_savp/HPC_scripts/hyperparam_setup.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# for choosing the model +export model=convLSTM +export model_hparams=../hparams/era5/${model}/model_hparams.json + +#create a subfolder with create time and user names, which can be consider as hyperparameter tunning folder. This can avoid overwrite the prevoius trained model using differ#ent hypermeters +export hyperdir="$(date +"%Y%m%dT%H%M")_"$USER"" + +echo "model: ${model}" +echo "hparams: ${model_hparams}" +echo "experiment dir: ${hyperdir}" diff --git a/video_prediction_savp/HPC_scripts/reset_dirs.sh b/video_prediction_savp/HPC_scripts/reset_dirs.sh new file mode 100644 index 0000000000000000000000000000000000000000..8de5247e044150d1c01eccfa512b9ae1c0e4cdfa --- /dev/null +++ b/video_prediction_savp/HPC_scripts/reset_dirs.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" DataPreprocess_to_tf.sh +sed -i "s|destination_dir=.*|destination_dir=${SAVE_DIR}preprocessedData/|g" DataPreprocess_to_tf.sh + +sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" train_era5.sh +sed -i "s|destination_dir=.*|destination_dir=${SAVE_DIR}models/|g" train_era5.sh + +sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" generate_era5.sh +sed -i "s|checkpoint_dir=.*|checkpoint_dir=${SAVE_DIR}models/|g" generate_era5.sh +sed -i "s|results_dir=.*|results_dir=${SAVE_DIR}results/|g" generate_era5.sh diff --git a/video_prediction_savp/HPC_scripts/train_era5.sh b/video_prediction_savp/HPC_scripts/train_era5.sh index 7b86d6a1ae13243fe4dc56575ed3c2a7fd1d528b..f68b590767e07354ea891c968a06b9ddf616a045 100755 --- a/video_prediction_savp/HPC_scripts/train_era5.sh +++ b/video_prediction_savp/HPC_scripts/train_era5.sh @@ -7,12 +7,13 @@ #SBATCH --output=train_era5-out.%j #SBATCH --error=train_era5-err.%j #SBATCH --time=00:20:00 -#SBATCH --gres=gpu:1 +#SBATCH --gres=gpu:2 #SBATCH --partition=develgpus #SBATCH --mail-type=ALL #SBATCH --mail-user=b.gong@fz-juelich.de ##jutil env activate -p cjjsc42 + # Name of virtual environment VIRT_ENV_NAME="vp" @@ -29,15 +30,18 @@ if [ -z ${VIRTUAL_ENV} ]; then fi fi + + + # declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py) source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/results/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 # for choosing the model model=convLSTM -model_hparams=../hparams/era5/model_hparams.json +model_hparams=../hparams/era5/${model}/model_hparams.json # rund training srun python ../scripts/train_dummy.py --input_dir ${source_dir}/tfrecords/ --dataset era5 --model ${model} --model_hparams_dict ${model_hparams} --output_dir ${destination_dir}/${model}/ + -#srun python scripts/train.py --input_dir data/era5 --dataset era5 --model savp --model_hparams_dict hparams/kth/ours_savp/model_hparams.json --output_dir logs/era5/ours_savp diff --git a/video_prediction_savp/env_setup/create_env.sh b/video_prediction_savp/env_setup/create_env.sh old mode 100755 new mode 100644 index df1c8ed91d3f6c75b7e111bcc0ac76afc6460f07..ad388826caf1d077c1c6434acae29d6cbaa9c6fc --- a/video_prediction_savp/env_setup/create_env.sh +++ b/video_prediction_savp/env_setup/create_env.sh @@ -1,5 +1,11 @@ #!/usr/bin/env bash +# +# __authors__ = Bing Gong, Michael Langguth +# __date__ = '2020_07_24' +# This script can be used for setting up the virtual environment needed for ambs-project +# or to simply activate it. +# # some first sanity checks if [[ ${BASH_SOURCE[0]} == ${0} ]]; then echo "ERROR: 'create_env.sh' must be sourced, i.e. execute by prompting 'source create_env.sh [virt_env_name]'" @@ -43,6 +49,11 @@ if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then USER_EMAIL=$(jutil user show -o json | grep email | cut -f2 -d':' | cut -f1 -d',' | cut -f2 -d'"') #replace the email in sbatch script with the USER_EMAIL sed -i "s/--mail-user=.*/--mail-user=$USER_EMAIL/g" ../HPC_scripts/*.sh + # load modules and check for their availability + echo "***** Checking modules required during the workflow... *****" + source ${ENV_SETUP_DIR}/modules_preprocess.sh + source ${ENV_SETUP_DIR}/modules_train.sh + elif [[ "${HOST_NAME}" == "zam347" ]]; then unset PYTHONPATH fi @@ -62,16 +73,9 @@ if [[ "$ENV_EXIST" == 0 ]]; then if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then # check module availability for the first time on known HPC-systems echo "***** Start installing additional Python modules with pip... *****" - #pip install --upgrade pip - pip3 install --no-cache-dir -r ${ENV_SETUP_DIR}/requirements.txt + pip3 install --no-cache-dir --ignore-installed -r ${ENV_SETUP_DIR}/requirements.txt #pip3 install --user netCDF4 #pip3 install --user numpy - - # check for availability of required modules - echo "***** Checking modules required for Preprocessing... *****" - source ${ENV_SETUP_DIR}/modules_preprocess.sh - echo "***** Checking modules required for training and Postprocessing... *****" - source ${ENV_SETUP_DIR}/modules_train.sh elif [[ "${HOST_NAME}" == "zam347" ]]; then echo "***** Start installing additional Python modules with pip... *****" pip3 install --upgrade pip @@ -83,7 +87,6 @@ if [[ "$ENV_EXIST" == 0 ]]; then pip3 install tensorflow-gpu==1.13.1 fi # expand PYTHONPATH... - export PYTHONPATH=${WORKING_DIR}/external_package/hickle/lib/python3.6/site-packages:$PYTHONPATH >> ${activate_virt_env} export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH >> ${activate_virt_env} #export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:$PYTHONPATH export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:$PYTHONPATH >> ${activate_virt_env} @@ -91,12 +94,9 @@ if [[ "$ENV_EXIST" == 0 ]]; then if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then export PYTHONPATH=${ENV_DIR}/lib/python3.6/site-packages:$PYTHONPATH >> ${activate_virt_env} fi - - # ...and ensure that this also done when the echo "" >> ${activate_virt_env} echo "# Expand PYTHONPATH..." >> ${activate_virt_env} - echo "export PYTHONPATH=${WORKING_DIR}/external_package/hickle/lib/python3.6/site-packages:\$PYTHONPATH" >> ${activate_virt_env} echo "export PYTHONPATH=${WORKING_DIR}:\$PYTHONPATH" >> ${activate_virt_env} #export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:\$PYTHONPATH echo "export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:\$PYTHONPATH" >> ${activate_virt_env} @@ -109,4 +109,3 @@ elif [[ "$ENV_EXIST" == 1 ]]; then source ${ENV_DIR}/bin/activate fi - diff --git a/video_prediction_savp/hparams/era5/convLSTM/model_hparams.json b/video_prediction_savp/hparams/era5/convLSTM/model_hparams.json new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/video_prediction_savp/hparams/era5/model_hparams.json b/video_prediction_savp/hparams/era5/model_hparams.json deleted file mode 100644 index b121ee2f005b6db753b2536deb804204dd41b78d..0000000000000000000000000000000000000000 --- a/video_prediction_savp/hparams/era5/model_hparams.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "batch_size": 8, - "lr": 0.001, - "nz": 16, - "max_steps":500, - "context_frames":10, - "sequence_length":20 - -} - - diff --git a/video_prediction_savp/scripts/train_dummy.py b/video_prediction_savp/scripts/train_dummy.py index 2f892f69c901f1eaa0a7ce2e57a3d0f6f131a7f9..9c621bbfc46dcf8cebb77b006604ed2f6f50056f 100644 --- a/video_prediction_savp/scripts/train_dummy.py +++ b/video_prediction_savp/scripts/train_dummy.py @@ -11,6 +11,14 @@ import time import numpy as np import tensorflow as tf from video_prediction import datasets, models +import matplotlib.pyplot as plt +from json import JSONEncoder +import pickle as pkl +class NumpyArrayEncoder(JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + return JSONEncoder.default(self, obj) def add_tag_suffix(summary, tag_suffix): @@ -23,41 +31,11 @@ def add_tag_suffix(summary, tag_suffix): value.tag = '/'.join([tag_split[0] + tag_suffix] + tag_split[1:]) return summary.SerializeToString() - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--input_dir", type=str, required=True, help="either a directory containing subdirectories " - "train, val, test, etc, or a directory containing " - "the tfrecords") - parser.add_argument("--val_input_dir", type=str, help="directories containing the tfrecords. default: input_dir") - parser.add_argument("--logs_dir", default='logs', help="ignored if output_dir is specified") - parser.add_argument("--output_dir", help="output directory where json files, summary, model, gifs, etc are saved. " - "default is logs_dir/model_fname, where model_fname consists of " - "information from model and model_hparams") - parser.add_argument("--output_dir_postfix", default="") - parser.add_argument("--checkpoint", help="directory with checkpoint or checkpoint name (e.g. checkpoint_dir/model-200000)") - parser.add_argument("--resume", action='store_true', help='resume from lastest checkpoint in output_dir.') - - parser.add_argument("--dataset", type=str, help="dataset class name") - parser.add_argument("--model", type=str, help="model class name") - parser.add_argument("--model_hparams", type=str, help="a string of comma separated list of model hyperparameters") - parser.add_argument("--model_hparams_dict", type=str, help="a json file of model hyperparameters") - - # parser.add_argument("--aggregate_nccl", type=int, default=0, help="whether to use nccl or cpu for gradient aggregation in multi-gpu training") - parser.add_argument("--gpu_mem_frac", type=float, default=0, help="fraction of gpu memory to use") - parser.add_argument("--seed", type=int) - - args = parser.parse_args() - - if args.seed is not None: - tf.set_random_seed(args.seed) - np.random.seed(args.seed) - random.seed(args.seed) - - if args.output_dir is None: +def generate_output_dir(output_dir, model,model_hparams,logs_dir,output_dir_postfix): + if output_dir is None: list_depth = 0 model_fname = '' - for t in ('model=%s,%s' % (args.model, args.model_hparams)): + for t in ('model=%s,%s' % (model, model_hparams)): if t == '[': list_depth += 1 if t == ']': @@ -69,52 +47,78 @@ def main(): if t in '[]': t = '' model_fname += t - args.output_dir = os.path.join(args.logs_dir, model_fname) + args.output_dir_postfix - - if args.resume: - if args.checkpoint: + output_dir = os.path.join(logs_dir, model_fname) + output_dir_postfix + return output_dir + + +def get_model_hparams_dict(model_hparams_dict): + """ + Get model_hparams_dict from json file + """ + model_hparams_dict_load = {} + if model_hparams_dict: + with open(model_hparams_dict) as f: + model_hparams_dict_load.update(json.loads(f.read())) + return model_hparams_dict + +def resume_checkpoint(resume,checkpoint,output_dir): + """ + Resume the existing model checkpoints and set checkpoint directory + """ + if resume: + if checkpoint: raise ValueError('resume and checkpoint cannot both be specified') - args.checkpoint = args.output_dir + checkpoint = output_dir + return checkpoint +def set_seed(seed): + if seed is not None: + tf.set_random_seed(seed) + np.random.seed(seed) + random.seed(seed) - model_hparams_dict = {} - if args.model_hparams_dict: - with open(args.model_hparams_dict) as f: - model_hparams_dict.update(json.loads(f.read())) - if args.checkpoint: - checkpoint_dir = os.path.normpath(args.checkpoint) - if not os.path.isdir(args.checkpoint): +def load_params_from_checkpoints_dir(model_hparams_dict,checkpoint,dataset,model): + + model_hparams_dict_load = {} + if model_hparams_dict: + with open(model_hparams_dict) as f: + model_hparams_dict_load.update(json.loads(f.read())) + + if checkpoint: + checkpoint_dir = os.path.normpath(checkpoint) + if not os.path.isdir(checkpoint): checkpoint_dir, _ = os.path.split(checkpoint_dir) if not os.path.exists(checkpoint_dir): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), checkpoint_dir) with open(os.path.join(checkpoint_dir, "options.json")) as f: print("loading options from checkpoint %s" % args.checkpoint) options = json.loads(f.read()) - args.dataset = args.dataset or options['dataset'] - args.model = args.model or options['model'] + dataset = dataset or options['dataset'] + model = model or options['model'] try: with open(os.path.join(checkpoint_dir, "model_hparams.json")) as f: - model_hparams_dict.update(json.loads(f.read())) + model_hparams_dict_load.update(json.loads(f.read())) except FileNotFoundError: print("model_hparams.json was not loaded because it does not exist") + return dataset, model, model_hparams_dict_load - print('----------------------------------- Options ------------------------------------') - for k, v in args._get_kwargs(): - print(k, "=", v) - print('------------------------------------- End --------------------------------------') - - VideoDataset = datasets.get_dataset_class(args.dataset) +def setup_dataset(dataset,input_dir,val_input_dir): + VideoDataset = datasets.get_dataset_class(dataset) train_dataset = VideoDataset( - args.input_dir, + input_dir, mode='train') val_dataset = VideoDataset( - args.val_input_dir or args.input_dir, + val_input_dir or input_dir, mode='val') - variable_scope = tf.get_variable_scope() variable_scope.set_use_resource(True) + return train_dataset,val_dataset,variable_scope - VideoPredictionModel = models.get_model_class(args.model) +def setup_model(model,model_hparams_dict,train_dataset,model_hparams): + """ + Set up model instance + """ + VideoPredictionModel = models.get_model_class(model) hparams_dict = dict(model_hparams_dict) hparams_dict.update({ 'context_frames': train_dataset.hparams.context_frames, @@ -123,9 +127,21 @@ def main(): }) model = VideoPredictionModel( hparams_dict=hparams_dict, - hparams=args.model_hparams) + hparams=model_hparams) + return model - batch_size = model.hparams.batch_size +def save_dataset_model_params_to_checkpoint_dir(args,output_dir,train_dataset,model): + if not os.path.exists(output_dir): + os.makedirs(output_dir) + with open(os.path.join(output_dir, "options.json"), "w") as f: + f.write(json.dumps(vars(args), sort_keys=True, indent=4)) + with open(os.path.join(output_dir, "dataset_hparams.json"), "w") as f: + f.write(json.dumps(train_dataset.hparams.values(), sort_keys=True, indent=4)) + with open(os.path.join(args.output_dir, "model_hparams.json"), "w") as f: + f.write(json.dumps(model.hparams.values(), sort_keys=True, indent=4)) + return None + +def make_dataset_iterator(train_dataset, val_dataset, batch_size ): train_tf_dataset = train_dataset.make_dataset_v2(batch_size) train_iterator = train_tf_dataset.make_one_shot_iterator() # The `Iterator.string_handle()` method returns a tensor that can be evaluated @@ -138,18 +154,87 @@ def main(): # train_handle, train_tf_dataset.output_types, train_tf_dataset.output_shapes) inputs = train_iterator.get_next() val = val_iterator.get_next() - - model.build_graph(inputs) + return inputs,train_handle, val_handle + + +def plot_train(train_losses,val_losses,output_dir): + epochs = list(range(len(train_losses))) + plt.plot(epochs, train_losses, 'g', label='Training loss') + plt.plot(epochs, val_losses, 'b', label='validation loss') + plt.title('Training and Validation loss') + plt.xlabel('Epochs') + plt.ylabel('Loss') + plt.legend() + plt.savefig(os.path.join(output_dir,'plot_train.png')) + +def save_results_to_dict(results_dict,output_dir): + with open(os.path.join(output_dir,"results.json"),"w") as fp: + json.dump(results_dict,fp) + +def save_results_to_pkl(train_losses,val_losses, output_dir): + with open(os.path.join(output_dir,"train_losses.pkl"),"wb") as f: + pkl.dump(train_losses,f) + with open(os.path.join(output_dir,"val_losses.pkl"),"wb") as f: + pkl.dump(val_losses,f) + - if not os.path.exists(args.output_dir): - os.makedirs(args.output_dir) - with open(os.path.join(args.output_dir, "options.json"), "w") as f: - f.write(json.dumps(vars(args), sort_keys=True, indent=4)) - with open(os.path.join(args.output_dir, "dataset_hparams.json"), "w") as f: - f.write(json.dumps(train_dataset.hparams.values(), sort_keys=True, indent=4)) - with open(os.path.join(args.output_dir, "model_hparams.json"), "w") as f: - f.write(json.dumps(model.hparams.values(), sort_keys=True, indent=4)) +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--input_dir", type=str, required=True, help="either a directory containing subdirectories " + "train, val, test, etc, or a directory containing " + "the tfrecords") + parser.add_argument("--val_input_dir", type=str, help="directories containing the tfrecords. default: input_dir") + parser.add_argument("--logs_dir", default='logs', help="ignored if output_dir is specified") + parser.add_argument("--output_dir", help="output directory where json files, summary, model, gifs, etc are saved. " + "default is logs_dir/model_fname, where model_fname consists of " + "information from model and model_hparams") + parser.add_argument("--output_dir_postfix", default="") + parser.add_argument("--checkpoint", help="directory with checkpoint or checkpoint name (e.g. checkpoint_dir/model-200000)") + parser.add_argument("--resume", action='store_true', help='resume from lastest checkpoint in output_dir.') + + parser.add_argument("--dataset", type=str, help="dataset class name") + parser.add_argument("--model", type=str, help="model class name") + parser.add_argument("--model_hparams", type=str, help="a string of comma separated list of model hyperparameters") + parser.add_argument("--model_hparams_dict", type=str, help="a json file of model hyperparameters") + + parser.add_argument("--gpu_mem_frac", type=float, default=0, help="fraction of gpu memory to use") + parser.add_argument("--seed",default=1234, type=int) + + args = parser.parse_args() + + #Set seed + set_seed(args.seed) + + #setup output directory + args.output_dir = generate_output_dir(args.output_dir, args.model, args.model_hparams, args.logs_dir, args.output_dir_postfix) + + #resume the existing checkpoint and set up the checkpoint directory to output directory + args.checkpoint = resume_checkpoint(args.resume,args.checkpoint,args.output_dir) + + #get model hparams dict from json file + #load the existing checkpoint related datasets, model configure (This information was stored in the checkpoint dir when last time training model) + args.dataset,args.model,model_hparams_dict = load_params_from_checkpoints_dir(args.model_hparams_dict,args.checkpoint,args.dataset,args.model) + + print('----------------------------------- Options ------------------------------------') + for k, v in args._get_kwargs(): + print(k, "=", v) + print('------------------------------------- End --------------------------------------') + #setup training val datset instance + train_dataset,val_dataset,variable_scope = setup_dataset(args.dataset,args.input_dir,args.val_input_dir) + + #setup model instance + model=setup_model(args.model,model_hparams_dict,train_dataset,args.model_hparams) + batch_size = model.hparams.batch_size + #Create input and val iterator + inputs, train_handle, val_handle = make_dataset_iterator(train_dataset, val_dataset, batch_size) + + #build model graph + model.build_graph(inputs) + + #save all the model, data params to output dirctory + save_dataset_model_params_to_checkpoint_dir(args,args.output_dir,train_dataset,model) + with tf.name_scope("parameter_count"): # exclude trainable variables that are replicas (used in multi-gpu setting) trainable_variables = set(tf.trainable_variables()) & set(model.saveable_variables) @@ -162,113 +247,83 @@ def main(): gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=args.gpu_mem_frac, allow_growth=True) config = tf.ConfigProto(gpu_options=gpu_options, allow_soft_placement=True) - - max_steps = model.hparams.max_steps - print ("max_steps",max_steps) + max_epochs = model.hparams.max_epochs #the number of epochs + num_examples_per_epoch = train_dataset.num_examples_per_epoch() + print ("number of exmaples per epoch:",num_examples_per_epoch) + steps_per_epoch = int(num_examples_per_epoch/batch_size) + total_steps = steps_per_epoch * max_epochs + #mock total_steps only for fast debugging + #total_steps = 10 + print ("Total steps for training:",total_steps) + results_dict = {} with tf.Session(config=config) as sess: print("parameter_count =", sess.run(parameter_count)) sess.run(tf.global_variables_initializer()) sess.run(tf.local_variables_initializer()) - #coord = tf.train.Coordinator() - #threads = tf.train.start_queue_runners(sess = sess, coord = coord) - print("Init done: {sess.run(tf.local_variables_initializer())}%") model.restore(sess, args.checkpoint) - - #sess.run(model.post_init_ops) - - #val_handle_eval = sess.run(val_handle) - #print ("val_handle_val",val_handle_eval) - #print("val handle done") sess.graph.finalize() start_step = sess.run(model.global_step) - - # start at one step earlier to log everything without doing any training # step is relative to the start_step - for step in range(-1, max_steps - start_step): + train_losses=[] + val_losses=[] + run_start_time = time.time() + for step in range(total_steps): global_step = sess.run(model.global_step) print ("global_step:", global_step) val_handle_eval = sess.run(val_handle) - - if step == 1: - # skip step -1 and 0 for timing purposes (for warmstarting) - start_time = time.time() + #Fetch variables in the graph fetches = {"global_step":model.global_step} fetches["train_op"] = model.train_op - - # fetches["latent_loss"] = model.latent_loss + #fetches["latent_loss"] = model.latent_loss fetches["total_loss"] = model.total_loss + + #fetch the specific loss function only for mcnet if model.__class__.__name__ == "McNetVideoPredictionModel": fetches["L_p"] = model.L_p fetches["L_gdl"] = model.L_gdl fetches["L_GAN"] =model.L_GAN - - - - fetches["summary"] = model.summary_op - - run_start_time = time.time() - #Run training results - #X = inputs["images"].eval(session=sess) - + + fetches["summary"] = model.summary_op results = sess.run(fetches) - - run_elapsed_time = time.time() - run_start_time - if run_elapsed_time > 1.5 and step > 0 and set(fetches.keys()) == {"global_step", "train_op"}: - print('running train_op took too long (%0.1fs)' % run_elapsed_time) - - #Run testing results - #val_fetches = {"global_step":global_step} + train_losses.append(results["total_loss"]) + #Fetch losses for validation data val_fetches = {} #val_fetches["latent_loss"] = model.latent_loss - #val_fetches["total_loss"] = model.total_loss + val_fetches["total_loss"] = model.total_loss val_fetches["summary"] = model.summary_op val_results = sess.run(val_fetches,feed_dict={train_handle: val_handle_eval}) - + val_losses.append(val_results["total_loss"]) + summary_writer.add_summary(results["summary"]) summary_writer.add_summary(val_results["summary"]) - - - - - val_datasets = [val_dataset] - val_models = [model] - - # for i, (val_dataset_, val_model) in enumerate(zip(val_datasets, val_models)): - # sess.run(val_model.accum_eval_metrics_reset_op) - # # traverse (roughly up to rounding based on the batch size) all the validation dataset - # accum_eval_summary_num_updates = val_dataset_.num_examples_per_epoch() // val_model.hparams.batch_size - # val_fetches = {"global_step": global_step, "accum_eval_summary": val_model.accum_eval_summary_op} - # for update_step in range(accum_eval_summary_num_updates): - # print('evaluating %d / %d' % (update_step + 1, accum_eval_summary_num_updates)) - # val_results = sess.run(val_fetches, feed_dict={train_handle: val_handle_eval}) - # accum_eval_summary = add_tag_suffix(val_results["accum_eval_summary"], '_%d' % (i + 1)) - # print("recording accum eval summary") - # summary_writer.add_summary(accum_eval_summary, val_results["global_step"]) summary_writer.flush() - + # global_step will have the correct step count if we resume from a checkpoint - # global step is read before it's incremented - steps_per_epoch = train_dataset.num_examples_per_epoch() / batch_size - #train_epoch = results["global_step"] / steps_per_epoch + # global step is read before it's incemented train_epoch = global_step/steps_per_epoch print("progress global step %d epoch %0.1f" % (global_step + 1, train_epoch)) - if step > 0: - elapsed_time = time.time() - start_time - average_time = elapsed_time / step - images_per_sec = batch_size / average_time - remaining_time = (max_steps - (start_step + step + 1)) * average_time - print("image/sec %0.1f remaining %dm (%0.1fh) (%0.1fd)" % - (images_per_sec, remaining_time / 60, remaining_time / 60 / 60, remaining_time / 60 / 60 / 24)) - - print("Total_loss:{}; L_p_loss:{}; L_gdl:{}; L_GAN: {}".format(results["total_loss"],results["L_p"],results["L_gdl"],results["L_GAN"])) + if model.__class__.__name__ == "McNetVideoPredictionModel": + print("Total_loss:{}; L_p_loss:{}; L_gdl:{}; L_GAN: {}".format(results["total_loss"],results["L_p"],results["L_gdl"],results["L_GAN"])) + elif model.__class__.__name__ == "VanillaConvLstmVideoPredictionModel": + print ("Total_loss:{}".format(results["total_loss"])) + else: + print ("The model name does not exist") - print("saving model to", args.output_dir) - saver.save(sess, os.path.join(args.output_dir, "model"), global_step=step)##Bing: cheat here a little bit because of the global step issue - print("done") - + #print("saving model to", args.output_dir) + saver.save(sess, os.path.join(args.output_dir, "model"), global_step=step)# + train_time = time.time() - run_start_time + results_dict = {"train_time":train_time, + "total_steps":total_steps} + save_results_to_dict(results_dict,args.output_dir) + save_results_to_pkl(train_losses, val_losses, args.output_dir) + print("train_losses:",train_losses) + print("val_losses:",val_losses) + plot_train(train_losses,val_losses,args.output_dir) + print("Done") + if __name__ == '__main__': main() diff --git a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py index c0c803d8eb663abc31ccab85ee3766dea1295e36..4b3376e629f372c569e008f528d01fed6e372b15 100644 --- a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py +++ b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py @@ -102,9 +102,9 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset): seqs["images"] = images return seqs filenames = self.filenames - print ("FILENAMES",filenames) - #TODO: - #temporal_filenames = self.temporal_filenames + + + shuffle = self.mode == 'train' or (self.mode == 'val' and self.hparams.shuffle_on_val) if shuffle: random.shuffle(filenames) @@ -353,7 +353,8 @@ def main(): current_path = os.getcwd() #input_dir = "/Users/gongbing/PycharmProjects/video_prediction/splits" #output_dir = "/Users/gongbing/PycharmProjects/video_prediction/data/era5" - #partition_names = ['train','val', 'test'] #64,64,3 val has issue# + + partition = { "train":{ @@ -377,7 +378,7 @@ def main(): stats = json.load(js_file) #TODO: search all the statistic json file correspdoing to the parition and generate a general statistic.json for normalization - + # ini. MPI comm = MPI.COMM_WORLD diff --git a/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py b/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py index e7753004348ae0ae60057a469de1e2d1421c3869..c7f3db7ce4fce732312eba0d9f17362faa2e64b5 100644 --- a/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py +++ b/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py @@ -17,21 +17,17 @@ from video_prediction.layers import layer_def as ld from video_prediction.layers.BasicConvLSTMCell import BasicConvLSTMCell class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): - def __init__(self, mode='train',aggregate_nccl=None, hparams_dict=None, + def __init__(self, mode='train', hparams_dict=None, hparams=None, **kwargs): super(VanillaConvLstmVideoPredictionModel, self).__init__(mode, hparams_dict, hparams, **kwargs) print ("Hparams_dict",self.hparams) self.mode = mode self.learning_rate = self.hparams.lr - self.gen_images_enc = None - self.recon_loss = None - self.latent_loss = None self.total_loss = None - self.context_frames = 10 - self.sequence_length = 20 + self.context_frames = self.hparams.context_frames + self.sequence_length = self.hparams.sequence_length self.predict_frames = self.sequence_length - self.context_frames - self.aggregate_nccl=aggregate_nccl - + self.max_epochs = self.hparams.max_epochs def get_default_hparams_dict(self): """ The keys of this dict define valid hyperparameters for instances of @@ -44,12 +40,7 @@ class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): batch_size: batch size for training. lr: learning rate. if decay steps is non-zero, this is the learning rate for steps <= decay_step. - - - max_steps: number of training steps. - - context_frames: the number of ground-truth frames to pass in at start. Must be specified during instantiation. sequence_length: the number of frames in the video sequence, @@ -62,37 +53,30 @@ class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): hparams = dict( batch_size=16, lr=0.001, - end_lr=0.0, - nz=16, - decay_steps=(200000, 300000), - max_steps=350000, + max_epochs=3000, ) return dict(itertools.chain(default_hparams.items(), hparams.items())) def build_graph(self, x): self.x = x["images"] - + self.global_step = tf.Variable(0, name = 'global_step', trainable = False) original_global_variables = tf.global_variables() # ARCHITECTURE - self.x_hat_context_frames, self.x_hat_predict_frames = self.convLSTM_network() - self.x_hat = tf.concat([self.x_hat_context_frames, self.x_hat_predict_frames], 1) - - - self.context_frames_loss = tf.reduce_mean( - tf.square(self.x[:, :self.context_frames, :, :, 0] - self.x_hat_context_frames[:, :, :, :, 0])) - self.predict_frames_loss = tf.reduce_mean( - tf.square(self.x[:, self.context_frames:, :, :, 0] - self.x_hat_predict_frames[:, :, :, :, 0])) - self.total_loss = self.context_frames_loss + self.predict_frames_loss + self.convLSTM_network() + print("self.x",self.x) + print("self.x_hat_context_frames,",self.x_hat_context_frames) + #self.context_frames_loss = tf.reduce_mean( + # tf.square(self.x[:, :self.context_frames, :, :, 0] - self.x_hat_context_frames[:, :, :, :, 0])) + self.total_loss = tf.reduce_mean( + tf.square(self.x[:, self.context_frames:, :, :, 0] - self.x_hat_context_frames[:, (self.context_frames-1):-1, :, :, 0])) self.train_op = tf.train.AdamOptimizer( learning_rate = self.learning_rate).minimize(self.total_loss, global_step = self.global_step) self.outputs = {} self.outputs["gen_images"] = self.x_hat # Summary op - self.loss_summary = tf.summary.scalar("recon_loss", self.context_frames_loss) - self.loss_summary = tf.summary.scalar("latent_loss", self.predict_frames_loss) self.loss_summary = tf.summary.scalar("total_loss", self.total_loss) self.summary_op = tf.summary.merge_all() global_variables = [var for var in tf.global_variables() if var not in original_global_variables] @@ -101,7 +85,7 @@ class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): @staticmethod - def convLSTM_cell(inputs, hidden, nz=16): + def convLSTM_cell(inputs, hidden): conv1 = ld.conv_layer(inputs, 3, 2, 8, "encode_1", activate = "leaky_relu") @@ -140,23 +124,28 @@ class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): VanillaConvLstmVideoPredictionModel.convLSTM_cell) # make the template to share the variables # create network x_hat_context = [] - x_hat_predict = [] - seq_start = 1 + x_hat = [] hidden = None - for i in range(self.context_frames): - if i < seq_start: + #This is for training + for i in range(self.sequence_length): + if i < self.context_frames: x_1, hidden = network_template(self.x[:, i, :, :, :], hidden) else: x_1, hidden = network_template(x_1, hidden) x_hat_context.append(x_1) - - for i in range(self.predict_frames): - x_1, hidden = network_template(x_1, hidden) - x_hat_predict.append(x_1) - + + #This is for generating video + hidden_g = None + for i in range(self.sequence_length): + if i < self.context_frames: + x_1_g, hidden_g = network_template(self.x[:, i, :, :, :], hidden_g) + else: + x_1_g, hidden_g = network_template(x_1_g, hidden_g) + x_hat.append(x_1_g) + # pack them all together x_hat_context = tf.stack(x_hat_context) - x_hat_predict = tf.stack(x_hat_predict) - self.x_hat_context = tf.transpose(x_hat_context, [1, 0, 2, 3, 4]) # change first dim with sec dim - self.x_hat_predict = tf.transpose(x_hat_predict, [1, 0, 2, 3, 4]) # change first dim with sec dim - return self.x_hat_context, self.x_hat_predict + x_hat = tf.stack(x_hat) + self.x_hat_context_frames = tf.transpose(x_hat_context, [1, 0, 2, 3, 4]) # change first dim with sec dim + self.x_hat= tf.transpose(x_hat, [1, 0, 2, 3, 4]) # change first dim with sec dim + self.x_hat_predict_frames = self.x_hat[:,self.context_frames:,:,:,:] diff --git a/workflow_parallel_frame_prediction/DataExtraction/main_single_master.py b/workflow_parallel_frame_prediction/DataExtraction/main_single_master.py index 4894408adeb1a41b106faafb75def912ca5e4ad5..fda72c671d2804e87b121a2bd62038890a7f5161 100644 --- a/workflow_parallel_frame_prediction/DataExtraction/main_single_master.py +++ b/workflow_parallel_frame_prediction/DataExtraction/main_single_master.py @@ -93,19 +93,33 @@ def main(): if clear_destination == 1: shutil.rmtree(destination_dir) os.mkdir(destination_dir) - logger.critical("Destination : {destination} exist -> Remove and Re-Cereate".format(destination=destination_dir)) - print("Destination : {destination} exist -> Remove and Re-Cereate".format(destination=destination_dir)) + logger.critical("Destination : {destination} exist -> Remove and Re-Create".format(destination=destination_dir)) + print("Destination : {destination} exist -> Remove and Re-Create".format(destination=destination_dir)) else: logger.critical("Destination : {destination} exist -> will not be removed (caution : overwrite)".format(destination=destination_dir)) print("Destination : {destination} exist -> will not be rmeoved (caution : overwrite)".format(destination=destination_dir)) + + + + # 20200630 +++ Scarlet + else: + if my_rank == 0: + os.makedirs(destination_dir) #, exist_ok=True) + logger.info("Destination : {destination} does not exist -> Create".format(destination=destination_dir)) + print("Destination : {destination} does not exist -> Create".format(destination=destination_dir)) + + # 20200630 --- Scarlet + # Create a log folder for slave-nodes to write down their processes slave_log_path = os.path.join(destination_dir,log_temp) if my_rank == 0: if os.path.exists(slave_log_path) == False: - os.mkdir(slave_log_path) + # 20200630 Scarlet + #os.mkdir(slave_log_path) + os.makedirs(slave_log_path) if my_rank == 0: # node is master diff --git a/workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py b/workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py index 1a25098d0c565000ad1035aeca4794f1a0b280cc..68d1ddfbfe413aab00950b71a336d0c1a43cbbf8 100644 --- a/workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py +++ b/workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py @@ -26,37 +26,22 @@ varnames = args.varnames cv ={} partition1 = { "train":{ - "2222":[1,2,3,5,6,7,8,9,10,11,12], - "2010_1":[1,2,3,4,5,6,7,8,9,10,11,12], - "2012":[1,2,3,4,5,6,7,8,9,10,11,12], - "2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12], - "2015":[1,2,3,4,5,6,7,8,9,10,11,12], - "2017":[1,2,3,4,5,6,7,8,9,10,11,12] + #"2222":[1,2,3,5,6,7,8,9,10,11,12], + #"2010_1":[1,2,3,4,5,6,7,8,9,10,11,12], + #"2012":[1,2,3,4,5,6,7,8,9,10,11,12], + #"2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12], + #"2015":[1,2,3,4,5,6,7,8,9,10,11,12], + #"2017":[1,2,3,4,5,6,7,8,9,10,11,12] + "2015":[1,2,3,4,5,6,7,8,9,10,11,12] }, "val": {"2016":[1,2,3,4,5,6,7,8,9,10,11,12] }, "test": - {"2019":[1,2,3,4,5,6,7,8,9,10,11,12] + {"2017":[1,2,3,4,5,6,7,8,9,10,11,12] } } -partition2 = { - "train":{ - "2222":[1,2,3,5,6,7,8,9,10,11,12], - "2012":[1,2,3,4,5,6,7,8,9,10,11,12], - "2015":[1,2,3,4,5,6,7,8,9,10,11,12], - "2016":[1,2,3,4,5,6,7,8,9,10,11,12], - "2017":[1,2,3,4,5,6,7,8,9,10,11,12], - "2019":[1,2,3,4,5,6,7,8,9,10,11,12] - }, - "val": - {"2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12] - }, - "test": - {"2010_1":[1,2,3,4,5,6,7,8,9,10,11,12] - } - }