Skip to content
Snippets Groups Projects
Commit 597d05ef authored by stadtler1's avatar stadtler1
Browse files

Merge branch 'bing_issue#009_clean_up_postprocessing' into develop

parents 6e5f2ddf 1706d4b4
Branches
Tags
No related merge requests found
Pipeline #42595 failed
Showing
with 1488 additions and 1338 deletions
......@@ -88,6 +88,7 @@ celerybeat-schedule
venv/
ENV/
virtual_env*/
virt_env*/
# Spyder project settings
.spyderproject
......@@ -108,8 +109,8 @@ virtual_env*/
*.DS_Store
# Ignore log- and errorfiles
*-err.???????
*-out.???????
*-err.[0-9]*
*-out.[0-9]*
#Ignore the results files
......
#!/bin/bash -x
## Controlling Batch-job
#SBATCH --account=deepacf
#SBATCH --nodes=1
#SBATCH --ntasks=13
......@@ -6,18 +7,40 @@
#SBATCH --cpus-per-task=1
#SBATCH --output=DataExtraction-out.%j
#SBATCH --error=DataExtraction-err.%j
#SBATCH --time=00:20:00
#SBATCH --time=05:00:00
#SBATCH --partition=devel
#SBATCH --mail-type=ALL
#SBATCH --mail-user=s.stadtler@fz-juelich.de
##jutil env activate -p deepacf
module purge
module use $OTHERSTAGES
module load Stages/2019a
module addad Intel/2019.3.199-GCC-8.3.0 ParaStationMPI/5.2.2-1
module load h5py/2.9.0-Python-3.6.8
module load mpi4py/3.0.1-Python-3.6.8
module load netcdf4-python/1.5.0.1-Python-3.6.8
srun python ../../workflow_parallel_frame_prediction/DataExtraction/mpi_stager_v2.py --source_dir /p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/2017/ --destination_dir /p/scratch/deepacf/scarlet/extractedData
#SBATCH --mail-user=b.gong@fz-juelich.de
jutil env activate -p deepacf
# Name of virtual environment
VIRT_ENV_NAME="virt_env_hdfml"
# Loading mouldes
source ../env_setup/modules_preprocess.sh
# Activate virtual environment if needed (and possible)
if [ -z ${VIRTUAL_ENV} ]; then
if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then
echo "Activating virtual environment..."
source ../${VIRT_ENV_NAME}/bin/activate
else
echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..."
exit 1
fi
fi
# Declare path-variables
source_dir="/p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/"
dest_dir="/p/scratch/deepacf/video_prediction_shared_folder/extractedData/"
year="2010"
# Run data extraction
srun python ../../workflow_parallel_frame_prediction/DataExtraction/mpi_stager_v2.py --source_dir ${source_dir}/${year}/ --destination_dir ${dest_dir}/${year}/
# 2tier pystager
#srun python ../../workflow_parallel_frame_prediction/DataExtraction/main_single_master.py --source_dir /p/fastdata/slmet/slmet111/met_data/ecmwf/era5/nc/${year}/ --destination_dir ${SAVE_DIR}/extractedData/${year}
#!/bin/bash -x
## Controlling Batch-job
#SBATCH --account=deepacf
#SBATCH --nodes=1
#SBATCH --ntasks=12
......@@ -6,38 +7,58 @@
#SBATCH --cpus-per-task=1
#SBATCH --output=DataPreprocess-out.%j
#SBATCH --error=DataPreprocess-err.%j
#SBATCH --time=02:20:00
#SBATCH --partition=batch
#SBATCH --time=00:20:00
#SBATCH --partition=devel
#SBATCH --mail-type=ALL
#SBATCH --mail-user=b.gong@fz-juelich.de
module --force purge
module use $OTHERSTAGES
module load Stages/2019a
module load Intel/2019.3.199-GCC-8.3.0 ParaStationMPI/5.2.2-1
module load h5py/2.9.0-Python-3.6.8
module load mpi4py/3.0.1-Python-3.6.8
# Name of virtual environment
VIRT_ENV_NAME="virt_env_hdfml"
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
--source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2015/ \
--destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2015/ \
--vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
# Activate virtual environment if needed (and possible)
if [ -z ${VIRTUAL_ENV} ]; then
if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then
echo "Activating virtual environment..."
source ../${VIRT_ENV_NAME}/bin/activate
else
echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..."
exit 1
fi
fi
# Loading mouldes
source ../env_setup/modules_preprocess.sh
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
--source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2016/ \
--destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2016/ \
--vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
source_dir=${SAVE_DIR}/extractedData
destination_dir=${SAVE_DIR}/preprocessedData/era5-Y2015to2017M01to12
script_dir=`pwd`
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
--source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2017/ \
--destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2017/ \
--vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
declare -a years=("2222"
"2010_1"
"2012"
"2013_complete"
"2015"
"2016"
"2017"
"2019"
)
declare -a years=(
"2015"
"2016"
"2017"
)
# ececute Python-scripts
for year in "${years[@]}"; do
echo "Year $year"
echo "source_dir ${source_dir}/${year}"
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
--source_dir ${source_dir} -scr_dir ${script_dir} \
--destination_dir ${destination_dir} --years ${year} --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
done
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
# --source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2017 \
# --destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/Y2016toY2017M01to12-128x160-74d00N71d0E-T_MSL_gph500/2017 \
# --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py --destination_dir ${destination_dir} --varnames T2 MSL gph500
#!/bin/bash -x
#SBATCH --account=deepacf
#SBATCH --nodes=1
#SBATCH --ntasks=12
##SBATCH --ntasks-per-node=12
#SBATCH --cpus-per-task=1
#SBATCH --output=DataPreprocess-out.%j
#SBATCH --error=DataPreprocess-err.%j
#SBATCH --time=00:20:00
#SBATCH --partition=devel
#SBATCH --mail-type=ALL
#SBATCH --mail-user=m.langguth@fz-juelich.de
module --force purge
module use $OTHERSTAGES
module load Stages/2019a
module load Intel/2019.3.199-GCC-8.3.0 ParaStationMPI/5.2.2-1
module load h5py/2.9.0-Python-3.6.8
module load mpi4py/3.0.1-Python-3.6.8
source_dir=/p/scratch/deepacf/video_prediction_shared_folder/extractedData
destination_dir=/p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/hickle
declare -a years=("2015"
"2016"
"2017"
)
for year in "${years[@]}";
do
echo "Year $year"
echo "source_dir ${source_dir}/${year}"
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
--source_dir ${source_dir}/${year}/ \
--destination_dir ${destination_dir}/${year}/ --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
done
srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py --destination_dir ${destination_dir}
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
# --source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2015/ \
# --destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2015/ \
# --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
# --source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2016/ \
# --destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2016/ \
# --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
# --source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2017/ \
# --destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/2017/ \
# --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py \
#--destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-#T_MSL_gph500/
#srun python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py \
# --source_dir /p/scratch/deepacf/video_prediction_shared_folder/extractedData/2017 \
# --destination_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/Y2016toY2017M01to12-128x160-74d00N71d0E-T_MSL_gph500/2017 \
# --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710
#!/bin/bash -x
#SBATCH --account=deepacf
#SBATCH --nodes=1
#SBATCH --ntasks=12
##SBATCH --ntasks-per-node=12
#SBATCH --ntasks=13
##SBATCH --ntasks-per-node=13
#SBATCH --cpus-per-task=1
#SBATCH --output=DataPreprocess_to_tf-out.%j
#SBATCH --error=DataPreprocess_to_tf-err.%j
......@@ -11,12 +11,26 @@
#SBATCH --mail-type=ALL
#SBATCH --mail-user=b.gong@fz-juelich.de
module purge
module use $OTHERSTAGES
module load Stages/2019a
module load Intel/2019.3.199-GCC-8.3.0 ParaStationMPI/5.2.2-1
module load h5py/2.9.0-Python-3.6.8
module load mpi4py/3.0.1-Python-3.6.8
module load TensorFlow/1.13.1-GPU-Python-3.6.8
srun python ../video_prediction/datasets/era5_dataset_v2.py /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/Y2016M01to12-128_160-74.00N710E-T_T_T/splits/ /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/Y2016M01to12-128_160-74.00N710E-T_T_T/tfrecords/ -vars T2 T2 T2
# Name of virtual environment
VIRT_ENV_NAME="vp"
# Loading mouldes
source ../env_setup/modules_train.sh
# Activate virtual environment if needed (and possible)
if [ -z ${VIRTUAL_ENV} ]; then
if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then
echo "Activating virtual environment..."
source ../${VIRT_ENV_NAME}/bin/activate
else
echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..."
exit 1
fi
fi
# declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py)
source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/
destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/
# run Preprocessing (step 2 where Tf-records are generated)
srun python ../video_prediction/datasets/era5_dataset_v2.py ${source_dir}/pickle ${destination_dir}/tfrecords -vars T2 MSL gph500 -height 128 -width 160 -seq_length 20
......@@ -13,19 +13,33 @@
#SBATCH --mail-user=b.gong@fz-juelich.de
##jutil env activate -p cjjsc42
# Name of virtual environment
VIRT_ENV_NAME="vp"
module purge
module load GCC/8.3.0
module load ParaStationMPI/5.2.2-1
module load TensorFlow/1.13.1-GPU-Python-3.6.8
module load netcdf4-python/1.5.0.1-Python-3.6.8
module load h5py/2.9.0-Python-3.6.8
# Loading mouldes
source ../env_setup/modules_train.sh
# Activate virtual environment if needed (and possible)
if [ -z ${VIRTUAL_ENV} ]; then
if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then
echo "Activating virtual environment..."
source ../${VIRT_ENV_NAME}/bin/activate
else
echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..."
exit 1
fi
fi
# declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py)
source_dir=/p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/
checkpoint_dir=/p/scratch/deepacf/video_prediction_shared_folder/models/
results_dir=/p/scratch/deepacf/video_prediction_shared_folder/results/
python -u ../scripts/generate_transfer_learning_finetune.py \
--input_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/era5-Y2017M01to12-64x64-50d00N11d50E-T_T_T/tfrecords/ \
--dataset_hparams sequence_length=20 --checkpoint /p/scratch/deepacf/video_prediction_shared_folder/models/era5-Y2017M01to12-64x64-50d00N11d50E-T_T_T/ours_gan \
--mode test --results_dir /p/scratch/deepacf/video_prediction_shared_folder/results/era5-Y2017M01to12-64x64-50d00N11d50E-T_T_T \
--batch_size 4 --dataset era5 > generate_era5-out.out
# name of model
model=convLSTM
# run postprocessing/generation of model results including evaluation metrics
srun python -u ../scripts/generate_transfer_learning_finetune.py \
--input_dir ${source_dir}/tfrecords --dataset_hparams sequence_length=20 --checkpoint ${checkpoint_dir}/${model} \
--mode test --model ${model} --results_dir ${results_dir}/${model}/ --batch_size 2 --dataset era5 > generate_era5-out.out
#srun python scripts/train.py --input_dir data/era5 --dataset era5 --model savp --model_hparams_dict hparams/kth/ours_savp/model_hparams.json --output_dir logs/era5/ours_savp
#!/usr/bin/env bash
# for choosing the model
export model=convLSTM
export model_hparams=../hparams/era5/${model}/model_hparams.json
#create a subfolder with create time and user names, which can be consider as hyperparameter tunning folder. This can avoid overwrite the prevoius trained model using differ#ent hypermeters
export hyperdir="$(date +"%Y%m%dT%H%M")_"$USER""
echo "model: ${model}"
echo "hparams: ${model_hparams}"
echo "experiment dir: ${hyperdir}"
#!/usr/bin/env bash
sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" DataPreprocess_to_tf.sh
sed -i "s|destination_dir=.*|destination_dir=${SAVE_DIR}preprocessedData/|g" DataPreprocess_to_tf.sh
sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" train_era5.sh
sed -i "s|destination_dir=.*|destination_dir=${SAVE_DIR}models/|g" train_era5.sh
sed -i "s|source_dir=.*|source_dir=${SAVE_DIR}preprocessedData/|g" generate_era5.sh
sed -i "s|checkpoint_dir=.*|checkpoint_dir=${SAVE_DIR}models/|g" generate_era5.sh
sed -i "s|results_dir=.*|results_dir=${SAVE_DIR}results/|g" generate_era5.sh
......@@ -7,21 +7,41 @@
#SBATCH --output=train_era5-out.%j
#SBATCH --error=train_era5-err.%j
#SBATCH --time=00:20:00
#SBATCH --gres=gpu:1
#SBATCH --gres=gpu:2
#SBATCH --partition=develgpus
#SBATCH --mail-type=ALL
#SBATCH --mail-user=b.gong@fz-juelich.de
##jutil env activate -p cjjsc42
module --force purge
module use $OTHERSTAGES
module load Stages/2019a
module load GCCcore/.8.3.0
module load mpi4py/3.0.1-Python-3.6.8
module load h5py/2.9.0-serial-Python-3.6.8
module load TensorFlow/1.13.1-GPU-Python-3.6.8
module load cuDNN/7.5.1.10-CUDA-10.1.105
# Name of virtual environment
VIRT_ENV_NAME="vp"
# Loading mouldes
source ../env_setup/modules_train.sh
# Activate virtual environment if needed (and possible)
if [ -z ${VIRTUAL_ENV} ]; then
if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then
echo "Activating virtual environment..."
source ../${VIRT_ENV_NAME}/bin/activate
else
echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..."
exit 1
fi
fi
# declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py)
source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/
destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/models/
# for choosing the model
model=convLSTM
model_hparams=../hparams/era5/${model}/model_hparams.json
# rund training
srun python ../scripts/train_dummy.py --input_dir ${source_dir}/tfrecords/ --dataset era5 --model ${model} --model_hparams_dict ${model_hparams} --output_dir ${destination_dir}/${model}/
srun python ../scripts/train_v2.py --input_dir /p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/2017M01to12-64_64-50.00N11.50E-T_T_T/tfrecords --dataset era5 --model savp --model_hparams_dict ../hparams/kth/ours_savp/model_hparams.json --output_dir /p/scratch/deepacf/video_prediction_shared_folder/models/2017M01to12-64_64-50.00N11.50E-T_T_T/ours_savp
#srun python scripts/train.py --input_dir data/era5 --dataset era5 --model savp --model_hparams_dict hparams/kth/ours_savp/model_hparams.json --output_dir logs/era5/ours_savp
#!/usr/bin/env bash
#
# __authors__ = Bing Gong, Michael Langguth
# __date__ = '2020_07_24'
if [[ ! -n "$1" ]]; then
echo "Provide the user name, which will be taken as folder name"
# This script can be used for setting up the virtual environment needed for ambs-project
# or to simply activate it.
#
# some first sanity checks
if [[ ${BASH_SOURCE[0]} == ${0} ]]; then
echo "ERROR: 'create_env.sh' must be sourced, i.e. execute by prompting 'source create_env.sh [virt_env_name]'"
exit 1
fi
if [[ ! -n "$2" ]]; then
echo "Provide the env name, which will be taken as folder name"
exit 1
# from now on, just return if something unexpected occurs instead of exiting
# as the latter would close the terminal including logging out
if [[ ! -n "$1" ]]; then
echo "ERROR: Provide a name to set up the virtual environment, i.e. execute by prompting 'source create_env.sh [virt_env_name]"
return
fi
ENV_NAME=$2
FOLDER_NAME=$1
WORKING_DIR=/p/project/deepacf/deeprain/${FOLDER_NAME}/video_prediction_savp
ENV_SETUP_DIR=${WORKING_DIR}/env_setup
HOST_NAME=`hostname`
ENV_NAME=$1
ENV_SETUP_DIR=`pwd`
WORKING_DIR="$(dirname "$ENV_SETUP_DIR")"
EXE_DIR="$(basename "$ENV_SETUP_DIR")"
ENV_DIR=${WORKING_DIR}/${ENV_NAME}
source ${ENV_SETUP_DIR}/modules.sh
# Install additional Python packages.
# further sanity checks:
# * ensure execution from env_setup-directory
# * check if virtual env has already been set up
if [[ "${EXE_DIR}" != "env_setup" ]]; then
echo "ERROR: The setup-script for the virtual environment from the env_setup-directory!"
return
fi
if [[ -d ${ENV_DIR} ]]; then
echo "Virtual environment has already been set up under ${ENV_DIR}. The present virtual environment is activated now."
echo "NOTE: If you wish to set up a new virtual environment, delete the existing one or provide a different name."
ENV_EXIST=1
else
ENV_EXIST=0
fi
# add personal email-address to Batch-scripts
if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then
USER_EMAIL=$(jutil user show -o json | grep email | cut -f2 -d':' | cut -f1 -d',' | cut -f2 -d'"')
#replace the email in sbatch script with the USER_EMAIL
sed -i "s/--mail-user=.*/--mail-user=$USER_EMAIL/g" ../HPC_scripts/*.sh
# load modules and check for their availability
echo "***** Checking modules required during the workflow... *****"
source ${ENV_SETUP_DIR}/modules_preprocess.sh
source ${ENV_SETUP_DIR}/modules_train.sh
elif [[ "${HOST_NAME}" == "zam347" ]]; then
unset PYTHONPATH
fi
if [[ "$ENV_EXIST" == 0 ]]; then
# Activate virtual environmen and install additional Python packages.
echo "Configuring and activating virtual environment on ${HOST_NAME}"
python3 -m venv $ENV_DIR
source ${ENV_DIR}/bin/activate
pip3 install -r ${ENV_SETUP_DIR}/requirements.txt
#pip3 install --user netCDF4
#pip3 install --user numpy
#Copy the hickle package from bing's account
cp -r /p/project/deepacf/deeprain/bing/hickle ${WORKING_DIR}
activate_virt_env=${ENV_DIR}/bin/activate
echo ${activate_virt_env}
source ${ENV_SETUP_DIR}/modules.sh
source ${ENV_DIR}/bin/activate
source ${activate_virt_env}
export PYTHONPATH=${WORKING_DIR}/hickle/lib/python3.6/site-packages:$PYTHONPATH
export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH
export PYTHONPATH=${ENV_DIR}/lib/python3.6/site-packages:$PYTHONPATH
# install some requirements and/or check for modules
if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then
# check module availability for the first time on known HPC-systems
echo "***** Start installing additional Python modules with pip... *****"
pip3 install --no-cache-dir --ignore-installed -r ${ENV_SETUP_DIR}/requirements.txt
#pip3 install --user netCDF4
#pip3 install --user numpy
elif [[ "${HOST_NAME}" == "zam347" ]]; then
echo "***** Start installing additional Python modules with pip... *****"
pip3 install --upgrade pip
pip3 install -r ${ENV_SETUP_DIR}/requirements.txt
pip3 install mpi4py
pip3 install netCDF4
pip3 install numpy
pip3 install h5py
pip3 install tensorflow-gpu==1.13.1
fi
# expand PYTHONPATH...
export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH >> ${activate_virt_env}
#export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:$PYTHONPATH
export PYTHONPATH=${WORKING_DIR}/lpips-tensorflow:$PYTHONPATH
export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:$PYTHONPATH >> ${activate_virt_env}
if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then
export PYTHONPATH=${ENV_DIR}/lib/python3.6/site-packages:$PYTHONPATH >> ${activate_virt_env}
fi
# ...and ensure that this also done when the
echo "" >> ${activate_virt_env}
echo "# Expand PYTHONPATH..." >> ${activate_virt_env}
echo "export PYTHONPATH=${WORKING_DIR}:\$PYTHONPATH" >> ${activate_virt_env}
#export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:\$PYTHONPATH
echo "export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:\$PYTHONPATH" >> ${activate_virt_env}
if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == juwels* ]]; then
echo "export PYTHONPATH=${ENV_DIR}/lib/python3.6/site-packages:\$PYTHONPATH" >> ${activate_virt_env}
fi
elif [[ "$ENV_EXIST" == 1 ]]; then
# activating virtual env is suifficient
source ${ENV_DIR}/bin/activate
fi
#!/usr/bin/env bash
# __author__ = Bing Gong, Michael Langguth
# __date__ = '2020_06_26'
# This script loads the required modules for ambs on Juwels and HDF-ML.
# Note that some other packages have to be installed into a venv (see create_env.sh and requirements.txt).
HOST_NAME=`hostname`
echo "Start loading modules on ${HOST_NAME} required for preprocessing..."
echo "This script is used by: "
echo "* DataExtraction.sh"
echo "* DataPreprocess.sh"
module purge
module use $OTHERSTAGES
module load Stages/2019a
module load GCC/8.3.0
module load ParaStationMPI/5.2.2-1
module load mpi4py/3.0.1-Python-3.6.8
# serialized version is not available on HFML
# see https://gitlab.version.fz-juelich.de/haf/Wiki/-/wikis/HDF-ML%20System
if [[ "${HOST_NAME}" == hdfml* ]]; then
module load h5py/2.9.0-serial-Python-3.6.8
elif [[ "${HOST_NAME}" == juwels* ]]; then
module load h5py/2.9.0-Python-3.6.8
fi
module load netcdf4-python/1.5.0.1-Python-3.6.8
#!/usr/bin/env bash
# __author__ = Bing Gong, Michael Langguth
# __date__ = '2020_06_26'
# This script loads the required modules for ambs on Juwels and HDF-ML.
# Note that some other packages have to be installed into a venv (see create_env.sh and requirements.txt).
HOST_NAME=`hostname`
echo "Start loading modules on ${HOST_NAME}..."
module purge
module use $OTHERSTAGES
module load Stages/2019a
module load GCC/8.3.0
module load MVAPICH2/.2.3.1-GDR
module load GCCcore/.8.3.0
module load ParaStationMPI/5.2.2-1
module load mpi4py/3.0.1-Python-3.6.8
# serialized version of HDF5 is used since only this version is compatible with TensorFlow/1.13.1-GPU-Python-3.6.8
module load h5py/2.9.0-serial-Python-3.6.8
module load TensorFlow/1.13.1-GPU-Python-3.6.8
module load cuDNN/7.5.1.10-CUDA-10.1.105
module load netcdf4-python/1.5.0.1-Python-3.6.8
......@@ -2,3 +2,4 @@ opencv-python
scipy
scikit-image
pandas
hickle
{
"batch_size": 8,
"batch_size": 10,
"lr": 0.001,
"nz": 16,
"max_steps":500,
"max_epochs":2,
"context_frames":10,
"sequence_length":20
}
......@@ -4,6 +4,7 @@ Classes and routines to retrieve and handle meta-data
import os
import sys
import time
import numpy as np
import json
from netCDF4 import Dataset
......@@ -23,7 +24,9 @@ class MetaData:
method_name = MetaData.__init__.__name__+" of Class "+MetaData.__name__
if not json_file is None:
MetaData.get_metadata_from_file(json_file)
print(json_file)
print(type(json_file))
MetaData.get_metadata_from_file(self,json_file)
else:
# No dictionary from json-file available, all other arguments have to set
......@@ -90,9 +93,11 @@ class MetaData:
self.nx, self.ny = np.abs(slices['lon_e'] - slices['lon_s']), np.abs(slices['lat_e'] - slices['lat_s'])
sw_c = [float(datafile.variables['lat'][slices['lat_e']-1]),float(datafile.variables['lon'][slices['lon_s']])] # meridional axis lat is oriented from north to south (i.e. monotonically decreasing)
self.sw_c = sw_c
self.lat = datafile.variables['lat'][slices['lat_s']:slices['lat_e']]
self.lon = datafile.variables['lon'][slices['lon_s']:slices['lon_e']]
# Now start constructing exp_dir-string
# switch sign and coordinate-flags to avoid negative values appearing in exp_dir-name
# Now start constructing expdir-string
# switch sign and coordinate-flags to avoid negative values appearing in expdir-name
if sw_c[0] < 0.:
sw_c[0] = np.abs(sw_c[0])
flag_coords[0] = "S"
......@@ -112,7 +117,7 @@ class MetaData:
expdir, expname = path_parts[0], path_parts[1]
# extend exp_dir_in successively (splitted up for better readability)
# extend expdir_in successively (splitted up for better readability)
expname += "-"+str(self.nx) + "x" + str(self.ny)
expname += "-"+(("{0: 05.2f}"+flag_coords[0]+"{1:05.2f}"+flag_coords[1]).format(*sw_c)).strip().replace(".","")+"-"
......@@ -139,8 +144,13 @@ class MetaData:
"expdir" : self.expdir}
meta_dict["sw_corner_frame"] = {
"lat" : self.sw_c[0],
"lon" : self.sw_c[1]
"lat" : np.around(self.sw_c[0],decimals=2),
"lon" : np.around(self.sw_c[1],decimals=2)
}
meta_dict["coordinates"] = {
"lat" : np.around(self.lat,decimals=2).tolist(),
"lon" : np.around(self.lon,decimals=2).tolist()
}
meta_dict["frame_size"] = {
......@@ -150,7 +160,7 @@ class MetaData:
meta_dict["variables"] = []
for i in range(len(self.varnames)):
print(self.varnames[i])
#print(self.varnames[i])
meta_dict["variables"].append(
{"var"+str(i+1) : self.varnames[i]})
......@@ -164,13 +174,18 @@ class MetaData:
meta_fname = os.path.join(dest_dir,"metadata.json")
if os.path.exists(meta_fname): # check if a metadata-file already exists and check its content
print(method_name+": json-file ('"+meta_fname+"' already exists. Its content will be checked...")
self.status = "old" # set status to old in order to prevent repeated modification of shell-/Batch-scripts
with open(meta_fname,'r') as js_file:
dict_dupl = json.load(js_file)
if dict_dupl != meta_dict:
print(method_name+": Already existing metadata (see '"+meta_fname+") do not fit data being processed right now. Ensure a common data base.")
sys.exit(1)
meta_fname_dbg = os.path.join(dest_dir,"metadata_debug.json")
print(method_name+": Already existing metadata (see '"+meta_fname+"') do not fit data being processed right now (see '" \
+meta_fname_dbg+"'. Ensure a common data base.")
with open(meta_fname_dbg,'w') as js_file:
json.dump(meta_dict,js_file)
raise ValueError
else: #do not need to do anything
pass
else:
......@@ -189,20 +204,24 @@ class MetaData:
with open(js_file) as js_file:
dict_in = json.load(js_file)
self.exp_dir = dict_in["exp_dir"]
self.expdir = dict_in["expdir"]
self.sw_c = [dict_in["sw_corner_frame"]["lat"],dict_in["sw_corner_frame"]["lon"] ]
self.lat = dict_in["coordinates"]["lat"]
self.lon = dict_in["coordinates"]["lon"]
self.nx = dict_in["frame_size"]["nx"]
self.ny = dict_in["frame_size"]["ny"]
self.variables = [dict_in["variables"][ivar] for ivar in dict_in["variables"].keys()]
# dict_in["variables"] is a list like [{var1: varname1},{var2: varname2},...]
list_of_dict_aux = dict_in["variables"]
# iterate through the list with an integer ivar
# note: the naming of the variables starts with var1, thus add 1 to the iterator
self.variables = [list_of_dict_aux[ivar]["var"+str(ivar+1)] for ivar in range(len(list_of_dict_aux))]
def write_dirs_to_batch_scripts(self,batch_script):
"""
Expands ('known') directory-variables in batch_script by exp_dir-attribute of class instance
Expands ('known') directory-variables in batch_script by expdir-attribute of class instance
"""
paths_to_mod = ["source_dir=","destination_dir=","checkpoint_dir=","results_dir="] # known directory-variables in batch-scripts
......@@ -224,6 +243,7 @@ class MetaData:
def write_destdir_jsontmp(dest_dir, tmp_dir = None):
"""
Writes dest_dir to temporary json-file (temp.json) stored in the current working directory.
To be executed by Master node in parallel mode.
"""
if not tmp_dir: tmp_dir = os.getcwd()
......@@ -259,6 +279,34 @@ class MetaData:
else:
return(dict_tmp.get("destination_dir"))
@staticmethod
def wait_for_jsontmp(tmp_dir = None, waittime = 10, delay=0.5):
"""
Waits at max. waittime (in sec) until temp.json-file becomes available
"""
method_name = MetaData.wait_for_jsontmp.__name__+" of Class "+MetaData.__name__
if not tmp_dir: tmp_dir = os.getcwd()
file_tmp = os.path.join(tmp_dir,"temp.json")
counter_max = waittime/delay
counter = 0
status = "not_ok"
while (counter <= counter_max):
if os.path.isfile(file_tmp):
status = "ok"
break
else:
time.sleep(delay)
counter += 1
if status != "ok": raise IOError(method_name+": '"+file_tmp+ \
"' does not exist after waiting for "+str(waittime)+" sec.")
@staticmethod
def issubset(a,b):
......
This diff is collapsed.
This diff is collapsed.
......@@ -7,7 +7,7 @@ from .kth_dataset import KTHVideoDataset
from .ucf101_dataset import UCF101VideoDataset
from .cartgripper_dataset import CartgripperVideoDataset
from .era5_dataset_v2 import ERA5Dataset_v2
from .era5_dataset_v2_anomaly import ERA5Dataset_v2_anomaly
#from .era5_dataset_v2_anomaly import ERA5Dataset_v2_anomaly
def get_dataset_class(dataset):
dataset_mappings = {
......@@ -19,7 +19,7 @@ def get_dataset_class(dataset):
'ucf101': 'UCF101VideoDataset',
'cartgripper': 'CartgripperVideoDataset',
"era5":"ERA5Dataset_v2",
"era5_anomaly":"ERA5Dataset_v2_anomaly",
# "era5_anomaly":"ERA5Dataset_v2_anomaly",
}
dataset_class = dataset_mappings.get(dataset, dataset)
print("datset_class",dataset_class)
......
......@@ -5,7 +5,6 @@ import os
import pickle
import random
import re
import hickle as hkl
import numpy as np
import json
import tensorflow as tf
......@@ -17,9 +16,14 @@ sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/'))
import DataPreprocess.process_netCDF_v2
from DataPreprocess.process_netCDF_v2 import get_unique_vars
from DataPreprocess.process_netCDF_v2 import Calc_data_stat
from metadata import MetaData
#from base_dataset import VarLenFeatureVideoDataset
from collections import OrderedDict
from tensorflow.contrib.training import HParams
from mpi4py import MPI
import glob
class ERA5Dataset_v2(VarLenFeatureVideoDataset):
def __init__(self, *args, **kwargs):
......@@ -28,6 +32,7 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset):
example = next(tf.python_io.tf_record_iterator(self.filenames[0]))
dict_message = MessageToDict(tf.train.Example.FromString(example))
feature = dict_message['features']['feature']
print("features in dataset:",feature.keys())
self.video_shape = tuple(int(feature[key]['int64List']['value'][0]) for key in ['sequence_length','height', 'width', 'channels'])
self.image_shape = self.video_shape[1:]
self.state_like_names_and_shapes['images'] = 'images/encoded', self.image_shape
......@@ -57,7 +62,6 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset):
sequence_lengths = [int(sequence_length.strip()) for sequence_length in sequence_lengths]
return np.sum(np.array(sequence_lengths) >= self.hparams.sequence_length)
def filter(self, serialized_example):
return tf.convert_to_tensor(True)
......@@ -70,7 +74,8 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset):
'height': tf.FixedLenFeature([], tf.int64),
'sequence_length': tf.FixedLenFeature([], tf.int64),
'channels': tf.FixedLenFeature([],tf.int64),
# 'images/encoded': tf.FixedLenFeature([], tf.string)
#'t_start': tf.FixedLenFeature([], tf.string),
't_start': tf.VarLenFeature(tf.int64),
'images/encoded': tf.VarLenFeature(tf.float32)
}
......@@ -79,23 +84,17 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset):
parsed_features = tf.parse_single_example(serialized_example, keys_to_features)
print ("Parse features", parsed_features)
seq = tf.sparse_tensor_to_dense(parsed_features["images/encoded"])
T_start = tf.sparse_tensor_to_dense(parsed_features["t_start"])
print("T_start in make dataset_v2", T_start)
#width = tf.sparse_tensor_to_dense(parsed_features["width"])
# height = tf.sparse_tensor_to_dense(parsed_features["height"])
# channels = tf.sparse_tensor_to_dense(parsed_features["channels"])
# sequence_length = tf.sparse_tensor_to_dense(parsed_features["sequence_length"])
images = []
# for i in range(20):
# images.append(parsed_features["images/encoded"].values[i])
# images = parsed_features["images/encoded"]
# images = tf.map_fn(lambda i: tf.image.decode_jpeg(parsed_features["images/encoded"].values[i]),offsets)
# seq = tf.sparse_tensor_to_dense(parsed_features["images/encoded"], '')
# Parse the string into an array of pixels corresponding to the image
# images = tf.decode_raw(parsed_features["images/encoded"],tf.int32)
# images = seq
print("Image shape {}, {},{},{}".format(self.video_shape[0],self.image_shape[0],self.image_shape[1], self.image_shape[2]))
images = tf.reshape(seq, [self.video_shape[0],self.image_shape[0],self.image_shape[1], self.image_shape[2]], name = "reshape_new")
seqs["images"] = images
seqs["T_start"] = T_start
return seqs
filenames = self.filenames
print ("FILENAMES",filenames)
......@@ -121,7 +120,6 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset):
# dataset = dataset.apply(tf.contrib.data.map_and_batch(
# _parser, batch_size, drop_remainder=True, num_parallel_calls=num_parallel_calls)) # Bing: Parallel data mapping, num_parallel_calls normally depends on the hardware, however, normally should be equal to be the usalbe number of CPUs
dataset = dataset.prefetch(batch_size) # Bing: Take the data to buffer inorder to save the waiting time for GPU
return dataset
......@@ -144,19 +142,21 @@ def _floats_feature(value):
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def save_tf_record(output_fname, sequences):
print('saving sequences to %s' % output_fname)
def save_tf_record(output_fname, sequences,T_start_points):
with tf.python_io.TFRecordWriter(output_fname) as writer:
for sequence in sequences:
for i in range(len(sequences)):
sequence = sequences[i]
T_start = T_start_points[i][0].strftime("%Y%m%d%H")
print("T_start:",T_start)
num_frames = len(sequence)
height, width, channels = sequence[0].shape
encoded_sequence = np.array([list(image) for image in sequence])
features = tf.train.Features(feature={
'sequence_length': _int64_feature(num_frames),
'height': _int64_feature(height),
'width': _int64_feature(width),
'channels': _int64_feature(channels),
't_start': _int64_feature(int(T_start)),
'images/encoded': _floats_feature(encoded_sequence.flatten()),
})
example = tf.train.Example(features=features)
......@@ -251,46 +251,62 @@ class Norm_data:
return(data[...] * getattr(self,varname+"sigma")**2 + getattr(self,varname+"avg"))
def read_frames_and_save_tf_records(output_dir,input_dir,partition_name,vars_in,seq_length=20,sequences_per_file=128,height=64,width=64,channels=3,**kwargs):#Bing: original 128
def read_frames_and_save_tf_records(stats,output_dir,input_file, temp_input_file, vars_in,year,month,seq_length=20,sequences_per_file=128,height=64,width=64,channels=3,**kwargs):#Bing: original 128
"""
Read pickle files based on month, to process and save to tfrecords
stats:dict, contains the stats information from pickle directory,
input_file: string, absolute path to pickle file
file_info: 1D list with three elements, partition_name(train,val or test), year, and month e.g.[train,1,2]
"""
# ML 2020/04/08:
# Include vars_in for more flexible data handling (normalization and reshaping)
# and optional keyword argument for kind of normalization
print ("read_frames_and_save_tf_records function")
if 'norm' in kwargs:
norm = kwargs.get("norm")
else:
norm = "minmax"
print("Make use of default minmax-normalization...")
output_dir = os.path.join(output_dir,partition_name)
os.makedirs(output_dir,exist_ok=True)
norm_cls = Norm_data(vars_in) # init normalization-instance
nvars = len(vars_in)
# open statistics file and feed it to norm-instance
with open(os.path.join(input_dir,"statistics.json")) as js_file:
norm_cls.check_and_set_norm(json.load(js_file),norm)
#with open(os.path.join(input_dir,"statistics.json")) as js_file:
norm_cls.check_and_set_norm(stats,norm)
sequences = []
T_start_points = []
sequence_iter = 0
sequence_lengths_file = open(os.path.join(output_dir, 'sequence_lengths.txt'), 'w')
X_train = hkl.load(os.path.join(input_dir, "X_" + partition_name + ".hkl"))
#sequence_lengths_file = open(os.path.join(output_dir, 'sequence_lengths.txt'), 'w')
#Bing 2020/07/16
#print ("open intput dir,",input_file)
with open(input_file, "rb") as data_file:
X_train = pickle.load(data_file)
with open(temp_input_file,"rb") as temp_file:
T_train = pickle.load(temp_file)
#print("T_train:",T_train)
#check to make sure the X_train and T_train has the same length
assert (len(X_train) == len(T_train))
X_possible_starts = [i for i in range(len(X_train) - seq_length)]
for X_start in X_possible_starts:
print("Interation", sequence_iter)
X_end = X_start + seq_length
#seq = X_train[X_start:X_end, :, :,:]
seq = X_train[X_start:X_end,:,:]
#print("*****len of seq ***.{}".format(len(seq)))
#seq = list(np.array(seq).reshape((len(seq), 64, 64, 3)))
seq = X_train[X_start:X_end,:,:,:]
#Recored the start point of the timestamps
T_start = T_train[X_start]
#print("T_start:",T_start)
seq = list(np.array(seq).reshape((seq_length, height, width, nvars)))
if not sequences:
last_start_sequence_iter = sequence_iter
print("reading sequences starting at sequence %d" % sequence_iter)
sequences.append(seq)
T_start_points.append(T_start)
sequence_iter += 1
sequence_lengths_file.write("%d\n" % len(seq))
if len(sequences) == sequences_per_file:
###Normalization should adpot the selected variables, here we used duplicated channel temperature variables
......@@ -299,13 +315,30 @@ def read_frames_and_save_tf_records(output_dir,input_dir,partition_name,vars_in,
for i in range(nvars):
sequences[:,:,:,:,i] = norm_cls.norm_var(sequences[:,:,:,:,i],vars_in[i],norm)
output_fname = 'sequence_{0}_to_{1}.tfrecords'.format(last_start_sequence_iter, sequence_iter - 1)
output_fname = 'sequence_Y_{}_M_{}_{}_to_{}.tfrecords'.format(year,month,last_start_sequence_iter,sequence_iter - 1)
output_fname = os.path.join(output_dir, output_fname)
save_tf_record(output_fname, list(sequences))
print("T_start_points:",T_start_points)
save_tf_record(output_fname, list(sequences), T_start_points)
T_start_points = []
sequences = []
print("Finished for input file",input_file)
#sequence_lengths_file.close()
return
def write_sequence_file(output_dir,seq_length,sequences_per_file):
partition_names = ["train","val","test"]
for partition_name in partition_names:
save_output_dir = os.path.join(output_dir,partition_name)
tfCounter = len(glob.glob1(save_output_dir,"*.tfrecords"))
print("Partition_name: {}, number of tfrecords: {}".format(partition_name,tfCounter))
sequence_lengths_file = open(os.path.join(save_output_dir, 'sequence_lengths.txt'), 'w')
for i in range(tfCounter*sequences_per_file):
sequence_lengths_file.write("%d\n" % seq_length)
sequence_lengths_file.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("input_dir", type=str, help="directory containing the processed directories ""boxing, handclapping, handwaving, ""jogging, running, walking")
......@@ -316,15 +349,115 @@ def main():
parser.add_argument("-height",type=int,default=64)
parser.add_argument("-width",type = int,default=64)
parser.add_argument("-seq_length",type=int,default=20)
parser.add_argument("-sequences_per_file",type=int,default=2)
args = parser.parse_args()
current_path = os.getcwd()
#input_dir = "/Users/gongbing/PycharmProjects/video_prediction/splits"
#output_dir = "/Users/gongbing/PycharmProjects/video_prediction/data/era5"
partition_names = ['train','val', 'test'] #64,64,3 val has issue#
#partition_names = ['train','val', 'test'] #64,64,3 val has issue#
############################################################
# CONTROLLING variable! Needs to be adapted manually!!!
############################################################
partition = {
"train":{
# "2222":[1,2,3,5,6,7,8,9,10,11,12],
# "2010_1":[1,2,3,4,5,6,7,8,9,10,11,12],
# "2012":[1,2,3,4,5,6,7,8,9,10,11,12],
# "2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12],
# "2015":[1,2,3,4,5,6,7,8,9,10,11,12],
"2017_test":[1,2,3,4,5,6,7,8,9,10]
},
"val":
{"2017_test":[11]
},
"test":
{"2017_test":[12]
}
}
# ini. MPI
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank() # rank of the node
p = comm.Get_size() # number of assigned nods
if my_rank == 0 :
# retrieve final statistics first (not parallelized!)
# some preparatory steps
stat_dir_prefix = args.input_dir
varnames = args.variables
vars_uni, varsind, nvars = get_unique_vars(varnames)
stat_obj = Calc_data_stat(nvars) # init statistic-instance
# loop over whole data set (training, dev and test set) to collect the intermediate statistics
print("Start collecting statistics from the whole datset to be processed...")
for split in partition.keys():
values = partition[split]
for year in values.keys():
file_dir = os.path.join(stat_dir_prefix,year)
for month in values[year]:
# process stat-file:
stat_obj.acc_stat_master(file_dir,int(month)) # process monthly statistic-file
# finalize statistics and write to json-file
stat_obj.finalize_stat_master(vars_uni)
stat_obj.write_stat_json(args.input_dir)
# organize parallelized partioning
partition_year_month = [] #contain lists of list, each list includes three element [train,year,month]
partition_names = list(partition.keys())
print ("partition_names:",partition_names)
broadcast_lists = []
for partition_name in partition_names:
read_frames_and_save_tf_records(output_dir=args.output_dir,input_dir=args.input_dir,vars_in=args.variables,partition_name=partition_name, seq_length=args.seq_length,height=args.height,width=args.width,sequences_per_file=2) #Bing: Todo need check the N_seq
#ead_frames_and_save_tf_records(output_dir = output_dir, input_dir = input_dir,partition_name = partition_name, N_seq=20) #Bing: TODO: first try for N_seq is 10, but it met loading data issue. let's try 5
partition_data = partition[partition_name]
years = list(partition_data.keys())
broadcast_lists.append([partition_name,years])
for nodes in range(1,p):
#ibroadcast_list = [partition_name,years,nodes]
#broadcast_lists.append(broadcast_list)
comm.send(broadcast_lists,dest=nodes)
message_counter = 1
while message_counter <= 12:
message_in = comm.recv()
message_counter = message_counter + 1
print("Message in from slaver",message_in)
write_sequence_file(args.output_dir,args.seq_length,args.sequences_per_file)
#write_sequence_file
else:
message_in = comm.recv()
print ("My rank,", my_rank)
print("message_in",message_in)
# open statistics file and feed it to norm-instance
print("Opening json-file: "+os.path.join(args.input_dir,"statistics.json"))
with open(os.path.join(args.input_dir,"statistics.json")) as js_file:
stats = json.load(js_file)
#loop the partitions (train,val,test)
for partition in message_in:
print("partition on slave ",partition)
partition_name = partition[0]
save_output_dir = os.path.join(args.output_dir,partition_name)
for year in partition[1]:
input_file = "X_" + '{0:02}'.format(my_rank) + ".pkl"
temp_file = "T_" + '{0:02}'.format(my_rank) + ".pkl"
input_dir = os.path.join(args.input_dir,year)
temp_file = os.path.join(input_dir,temp_file )
input_file = os.path.join(input_dir,input_file)
# create the tfrecords-files
read_frames_and_save_tf_records(year=year,month=my_rank,stats=stats,output_dir=save_output_dir, \
input_file=input_file,temp_input_file=temp_file,vars_in=args.variables, \
partition_name=partition_name,seq_length=args.seq_length, \
height=args.height,width=args.width,sequences_per_file=args.sequences_per_file)
print("Year {} finished",year)
message_out = ("Node:",str(my_rank),"finished","","\r\n")
print ("Message out for slaves:",message_out)
comm.send(message_out,dest=0)
MPI.Finalize()
if __name__ == '__main__':
main()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment