diff --git a/.gitignore b/.gitignore index b48e8214fba4effae015b5126201b7b1bcb781fb..5fccf918ab20f20390c83b6074f6c1c05878676c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -# Byte-compiled / optimized / DLL files +# Byt-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class @@ -105,16 +105,16 @@ virt_env*/ .idea/* +*/.idea *.DS_Store # Ignore log- and errorfiles -*-err.[0-9]* -*-out.[0-9]* +*-err*.[0-9]* +*-out*.[0-9]* #Ignore the results files - **/results_test_samples **/logs **/vp @@ -122,8 +122,10 @@ virt_env*/ *.h5 # Ignore (Batch) runscripts -batch_scripts/** -!batch_scripts/*_template.sh +**/batch_scripts/*.sh +!*_template.sh -!batch_scripts/wrapper_container.sh +!*wrapper_container.sh +# Ignore singularity containers (typically existing as symbolic link) +*.sif diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 26d33521af10bcc7fd8cea344038eaaeb78d0ef5..0000000000000000000000000000000000000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml diff --git a/.idea/downscaling-rnn-gan_leinonen2020.iml b/.idea/downscaling-rnn-gan_leinonen2020.iml deleted file mode 100644 index 8b8c395472a5a6b3598af42086e590417ace9933..0000000000000000000000000000000000000000 --- a/.idea/downscaling-rnn-gan_leinonen2020.iml +++ /dev/null @@ -1,12 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="PYTHON_MODULE" version="4"> - <component name="NewModuleRootManager"> - <content url="file://$MODULE_DIR$" /> - <orderEntry type="inheritedJdk" /> - <orderEntry type="sourceFolder" forTests="false" /> - </component> - <component name="PyDocumentationSettings"> - <option name="format" value="PLAIN" /> - <option name="myDocStringFormat" value="Plain" /> - </component> -</module> \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index 81ed3fcfa4851f8a199d530e35a9e1caae8046cc..0000000000000000000000000000000000000000 --- a/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,21 +0,0 @@ -<component name="InspectionProjectProfileManager"> - <profile version="1.0"> - <option name="myName" value="Project Default" /> - <inspection_tool class="PyCompatibilityInspection" enabled="true" level="WARNING" enabled_by_default="true"> - <option name="ourVersions"> - <value> - <list size="3"> - <item index="0" class="java.lang.String" itemvalue="2.7" /> - <item index="1" class="java.lang.String" itemvalue="3.6" /> - <item index="2" class="java.lang.String" itemvalue="3.9" /> - </list> - </value> - </option> - </inspection_tool> - <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false"> - <option name="processCode" value="true" /> - <option name="processLiterals" value="true" /> - <option name="processComments" value="true" /> - </inspection_tool> - </profile> -</component> \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml deleted file mode 100644 index 105ce2da2d6447d11dfe32bfb846c3d5b199fc99..0000000000000000000000000000000000000000 --- a/.idea/inspectionProfiles/profiles_settings.xml +++ /dev/null @@ -1,6 +0,0 @@ -<component name="InspectionProjectProfileManager"> - <settings> - <option name="USE_PROJECT_PROFILE" value="false" /> - <version value="1.0" /> - </settings> -</component> \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 65531ca992813bbfedbe43dfae5a5f4337168ed8..0000000000000000000000000000000000000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,4 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project version="4"> - <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6" project-jdk-type="Python SDK" /> -</project> \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 441c07170dea6fa26fa687ad1c1b701c5392f0b3..0000000000000000000000000000000000000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project version="4"> - <component name="ProjectModuleManager"> - <modules> - <module fileurl="file://$PROJECT_DIR$/.idea/downscaling-rnn-gan_leinonen2020.iml" filepath="$PROJECT_DIR$/.idea/downscaling-rnn-gan_leinonen2020.iml" /> - </modules> - </component> -</project> \ No newline at end of file diff --git a/downscaling_gan/README.md b/downscaling_gan/README.md index 39a4c40c9edba3cc1f273c12b0c1c3609e1c4d69..4ccce3553684576e3b9e493b7329982aeda355eb 100644 --- a/downscaling_gan/README.md +++ b/downscaling_gan/README.md @@ -4,17 +4,78 @@ This is a reference implementation of a stochastic, recurrent super-resolution G ## Important note -The above mentioned work serves as a starting point of this project where the focus is to downscale 2m temperature from ERA5-reanalysis data to IFS HRES (short-term) forecasts in the schope of WP 5 in MAELSTROM. +The above mentioned work may serve as a starting point for statistical downscaling in scope of WP 5 in the the ()[MAELSTROM project]. However, we started off with a simple U-net ot downscale the 2m temperature (see `downscaling_gan`-director of this branch). Nonetheless, the original code provided by Leinonen et al., 2020 can be run on the JSC's HPC-system as detailed below. The original description for running the code (provided [here]()) has been complemented for this purpose. + +## Get the repository +Simply clone this repository to your desired directory by +``` +# Either use... +git clone git@gitlab.jsc.fz-juelich.de:esde/machine-learning/downscaling_maelstrom.git # ...with SSH +# ...or... +git clone https://gitlab.jsc.fz-juelich.de/esde/machine-learning/downscaling_maelstrom.git # with HTTPS +``` +Ensure that your directory provides a reasonable amount of space (i.e. a feq GB). On JSC's HPC-system, it is recommended to choose a directory under `/p/project/` and to avoid cloning in your home-directory! ## Obtaining the data -The radar precipitation dataset (MCH-RZC in the paper) can be downloaded at https://doi.org/10.7910/DVN/ZDWWMG by following the instructions there. The GOES cloud optical thickness dataset (GOES-COT) can be found [in this data repository](https://doi.org/10.5281/zenodo.3835849) as "goes-samples-2019-128x128.nc". +The radar precipitation dataset (MCH-RZC in the paper) can be downloaded at https://doi.org/10.7910/DVN/ZDWWMG by following the instructions there. The GOES cloud optical thickness dataset (GOES-COT) can be found [in this data repository](https://doi.org/10.5281/zenodo.3835849) as "goes-samples-2019-128x128.nc". <br> +On JUST, the data is already made available under `/p/project/deepacf/maelstrom/data/downscaling_gan_leinonen/`. ## Obtaining the trained network The trained generator weights selected for use in the paper are included in the `models` directory. The weights for the other time steps can be found [here](https://doi.org/10.5281/zenodo.3835849). -## Running the code +## Running the code on JSC's HPC-system +To train a Leinonen's GAN-model on the provided dataset by yourself, a batch-script based job submission on JSC's HPC-system is made available. The environment as well as the provided template runscript under the `batch_scripts`-directory may also serve for testing on other datasets. In the following, the installation of the virtual environment and the required sytsem-side software preparations are described. + +### Getting a TensorFlow v1.15-container with Nvidia support +The provided source-code requires TensorFlow v1.15 for training. To allow training on the HPC-system (Juwels, Juwels Booster and HDF-ML) while exploiting the system's GPU capacities, a singularity container of this rather old TensorFlow version with Nvidia support is mandatory. Such a containers are distributed [here](https://docs.nvidia.com/deeplearning/frameworks/tensorflow-release-notes/overview.html#overview). Currently (2021-11-18), the [TensorFlow release 2021.09](https://docs.nvidia.com/deeplearning/frameworks/tensorflow-release-notes/rel_21-09.html#rel_21-09) can be used since the container is shipped with an OFED- and MPI-version and that fits the versions of JSC's HPC-system. <br><br> +This container is already available under `/p/project/deepacf/deeprain/video_prediction_shared_folder/containers_juwels_booster/`. <br> +To avoid duplicates of big singularity containers, it is recommended to link the respective file to the `env_setup`-directory of this repository: +``` +cd env_setup +ln -s /p/project/deepacf/deeprain/video_prediction_shared_folder/containers_juwels_booster/nvidia_tensorflow_21.09-tf1-py3.sif tensorflow_21.09-tf1-py3.sif +``` +Note that the (linked) container-file under `create_env` is named approriately! + +### Setting up the virtual environment +The singularity container lacks the netCDF-package for Python which is required to read the input data. +Thus, we need to set-up a virtual environment within the singularity container. The helper script `create_env.sh` accomplishs this together with `install_venv_container.sh`: +``` +# in env_setup-directory +source create_env.sh <venv_name> # please set <venv_name> +``` +This will create a virtual environment under the `virtual_envs` which is created under the base-directory. + +### Running the code on JSC's HPC-system +To run the code, please create a copy of the batch-script template `train_downscaling_GAN_template.sh` and edit this script to your personal needs, i.e. set your e-mail adress in the SLURM-commands, remove the template header, set the name of the virtual environment according the previous step, choose an application (e.g. "mchrzc" or "goescod") and adjust the paths to in- and output.<br> +Afterwards, the job can be submitted. +``` +cp train_downscaling_GAN_template.sh train_downscaling_GAN_<some_string>.sh # please set <some_string> +# Modify the created batch-script as mentioned above +sbatch train_downscaling_GAN_<some_string>.sh +``` +Please choose again an output-directory that +1. provides sufficient space for the output (> some GBs) and +2. is accessible for the computing nodes (e.g. under `/p/project/[...]`) + +### Producing plots +For evaluating the trained model and for creating the plots, the batch-script `train_downscaling_GAN_template.sh` is provided. Similarly to the training step, the template should be duplicated and edited. +Additionally, the `h5`-files from the training or the pre-trained model must be linked to the `models`-directory. +Important note: If you want to evaluate your own model, set the variables `mchrzc_gen_weights_fn` and `goes_gen_weights_fn` in the `plot_all`-function of `dsrnngan/plots.py` accordingly! +``` +# Change to models-directory and link the (pre-)trained models +cd models +ln -s <training_outdir>/<trained_model_for_mchrzc>.h5 ./<trained_model_for_mchrzc>.h5 +ln -s <training_outdir>/<trained_model_for_goes>.h5 ./<trained_model_for_goes>.h5 +# Change to batch_scripts-directory +cd ../batch_scripts +cp plot_downscaling_GAN_template.sh plot_downscaling_GAN_<some_string>.sh # please set <some_string> +# Modify the created batch-script analogous to above +sbatch plot_downscaling_GAN_<some_string>.sh +``` + +## Running the code somewhere else (original description) For training, you'll want a machine with a GPU and around 32 GB of memory (the training procedure for the radar dataset loads the entire dataset into memory). Running the pre-trained model should work just fine on a CPU. diff --git a/downscaling_gan/batch_scripts/plot_downscaling_GAN_template.sh b/downscaling_gan/batch_scripts/plot_downscaling_GAN_template.sh new file mode 100644 index 0000000000000000000000000000000000000000..5e10f63ad31e82dbf0c7ebdd0192f3a970811c31 --- /dev/null +++ b/downscaling_gan/batch_scripts/plot_downscaling_GAN_template.sh @@ -0,0 +1,54 @@ +#!/bin/bash -x +#SBATCH --account=deepacf +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +##SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=1 +#SBATCH --output=train_downscaling_GAN-out.%j +#SBATCH --error=train_downscaling_GAN-err.%j +#SBATCH --time=24:00:00 +#SBATCH --gres=gpu:1 +#SBATCH --partition=batch +#SBATCH --mail-type=ALL +#SBATCH --mail-user=m.langguth@fz-juelich.de + +######### Template identifier (don't remove) ######### +echo "Do not run the template scripts" +exit 99 +######### Template identifier (don't remove) ######### + +# set some paths and variables +DATE_NOW=$(date +%Y%m%dT%H%M%S) +WORK_DIR=`pwd` +BASE_DIR=$(dirname "$WORK_DIR") +# Name of virtual environment +VIRT_ENV_NAME="${WORK_DIR}/virtual_envs/<my_venv>" +# Name of container image (must be available in working directory) +CONTAINER_IMG="${WORK_DIR}/tensorflow_21.09-tf1-py3.sif" + +# simple sanity checks +if ! [[ -f ${VIRT_ENV_NAME}/bin/activate ]]; then + echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..." + exit 1 +fi + +if ! [[ -f ${CONTAINER_IMG} ]]; then + echo "ERROR: Required singularity containr ${CONTAINER_IMG} not found..." + exit 1 +fi + +# clean-up modules to avoid conflicts between host and container settings +module purge + +# variables for settings +data_dir="<path_to_data>" +mchrzc_file="mchrzc_samples-2018-128x128.nc" +goes_file="goes_samples-2019-128x128.nc" + +# Please uncomment the following CUDA configuration +export CUDA_VISIBLE_DEVICES=1 + +# run training +srun --mpi=pspmix --cpu-bind=none \ +singularity exec --nv ${CONTAINER_IMG} ./wrapper_container.sh ${VIRT_ENV_NAME} python3 ${BASE_DIR}/dsrnngan/main.py plot \ + --mchrzc_data_file=${data_dir}/${mchrzc_file} --goescod_data_file=${data_dir}/${goes_file} diff --git a/downscaling_gan/batch_scripts/train_downscaling_GAN_template.sh b/downscaling_gan/batch_scripts/train_downscaling_GAN_template.sh index 4cadd61a17b3319248dbd7a541fa5b84e133c526..b8710291b0943b061e8376f5121bff2c7ba91bd6 100644 --- a/downscaling_gan/batch_scripts/train_downscaling_GAN_template.sh +++ b/downscaling_gan/batch_scripts/train_downscaling_GAN_template.sh @@ -6,47 +6,51 @@ #SBATCH --cpus-per-task=1 #SBATCH --output=train_downscaling_GAN-out.%j #SBATCH --error=train_downscaling_GAN-err.%j -#SBATCH --time=00:20:00 +#SBATCH --time=24:00:00 #SBATCH --gres=gpu:1 #SBATCH --partition=batch #SBATCH --mail-type=ALL #SBATCH --mail-user=m.langguth@fz-juelich.de -##jutil env activate -p cjjsc42 ######### Template identifier (don't remove) ######### echo "Do not run the template scripts" exit 99 ######### Template identifier (don't remove) ######### -# Name of virtual environment -#VIRT_ENV_NAME="my_venv" - -# Loading mouldes -source ../env_setup/modules_train.sh -# Activate virtual environment if needed (and possible) -#if [ -z ${VIRTUAL_ENV} ]; then -# if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then -# echo "Activating virtual environment..." -# source ../${VIRT_ENV_NAME}/bin/activate -# else -# echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..." -# exit 1 -# fi -#fi +# set some paths and variables +DATE_NOW=$(date +%Y%m%dT%H%M%S) +WORK_DIR=`pwd` +BASE_DIR=$(dirname "$WORK_DIR") +# Name of virtual environment +VIRT_ENV_NAME="${WORK_DIR}/virtual_envs/<my_venv>" +# Name of container image (must be available in working directory) +CONTAINER_IMG="${WORK_DIR}/tensorflow_21.09-tf1-py3.sif" + +# simple sanity checks +if ! [[ -f ${VIRT_ENV_NAME}/bin/activate ]]; then + echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..." + exit 1 +fi + +if ! [[ -f ${CONTAINER_IMG} ]]; then + echo "ERROR: Required singularity containr ${CONTAINER_IMG} not found..." + exit 1 +fi + +# clean-up modules to avoid conflicts between host and container settings +module purge # variables for settings application="mchrzc" -data_in="" -dest_file_wgt="" -dest_file_log="" +data_in="<path_to_data>/${application}_2018-128x128.nc" # replace 2018 by 2019 for goes-application +dest_file_wgt="<path_to_store_weights>" +dest_file_log="${dest_file_wgt}/logging_${DATE_NOW}" # Please uncomment the following CUDA configuration -#CUDA_VISIBLE_DEVICES=1 +export CUDA_VISIBLE_DEVICES=1 # run training -srun python ../dsrnngan/main_train.py --application=${application} --data_file=${data_in} \ +srun --mpi=pspmix --cpu-bind=none \ +singularity exec --nv ${CONTAINER_IMG} ./wrapper_container.sh ${VIRT_ENV_NAME} python3 ${BASE_DIR}/dsrnngan/main.py train \ + --application=${application} --data_file=${data_in} \ --save_weights_root=${dest_file_wgt} --log_path ${dest_file_log} - - - - diff --git a/downscaling_gan/batch_scripts/wrapper_container.sh b/downscaling_gan/batch_scripts/wrapper_container.sh index cadda1a15c992344bd10bb5943096d454d7fcad5..8fe9e1613c691880145cff804952ba0918f73f18 100755 --- a/downscaling_gan/batch_scripts/wrapper_container.sh +++ b/downscaling_gan/batch_scripts/wrapper_container.sh @@ -4,7 +4,7 @@ ENV_SETUP_DIR=`pwd` WORKING_DIR="$(dirname "$ENV_SETUP_DIR")" EXE_DIR="$(basename "$ENV_SETUP_DIR")" -VENV_DIR=$1 +VENV_DIR=${WORKING_DIR}/$1/$1 shift # replaces $1 by $2, so that $@ does not include the name of the virtual environment anymore # sanity checks @@ -22,11 +22,16 @@ fi # Include site-packages from virtual environment... unset PYTHONPATH export PYTHONPATH=${VENV_DIR}/lib/python3.8/site-packages/:$PYTHONPATH +# ... dist-packages from container singularity... +export PYTHONPATH=/usr/local/lib/python3.8/dist-packages:$PYTHONPATH export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH -#export PYTHONPATH=${WORKING_DIR}/utils:$PYTHONPATH # Control echo "%wrapper_container.sh: Check PYTHONPATH below:" echo $PYTHONPATH +# MPI related environmental variables +export PMIX_SECURITY_MODE="native" +export TF_XLA_FLAGS=--tf_xla_auto_jit=0 # disable XLA graph optimization $@ + diff --git a/downscaling_gan/env_setup/create_env.sh b/downscaling_gan/env_setup/create_env.sh index bf86d2829a720f6130cb5e9443336a9cbb542a21..59d6694f0d55edc1fb1a8ac0fa8574d8515a97c5 100755 --- a/downscaling_gan/env_setup/create_env.sh +++ b/downscaling_gan/env_setup/create_env.sh @@ -2,15 +2,16 @@ # # __authors__ = Michael Langguth # __date__ = '2021_03_25' +# __update__= '2021-11-18' # # **************** Description **************** -# This script can be used for setting up the virtual environment needed for AMBS-project -# Add the flag -lcontainer for setting up the virtual environment inside a running cotainer environment. +# This script can be used for setting up the virtual environment needed for downscaling with the GAN-network +# developped by Leinonen et al., 2020 (DOI: https://doi.org/10.1109/TGRS.2020.3032790) # **************** Description **************** # -### auxiliary-function ### +### auxiliary-function S ### check_argin() { -# Handle input arguments and check if one is equal to -lcontainer +# Handle input arguments and check if one is equal to -lcontainer (not needed currently) # Can also be used to check for non-positional arguments (such as -exp_id=*, see commented lines) for argin in "$@"; do # if [[ $argin == *"-exp_id="* ]]; then @@ -23,105 +24,84 @@ check_argin() { bool_container=0 fi } -### auxiliary-function ### +### auxiliary-function E ### -# some first sanity checks -if [[ ${BASH_SOURCE[0]} == ${0} ]]; then - echo "ERROR: 'create_env.sh' must be sourced, i.e. execute by prompting 'source create_env.sh [virt_env_name]'" +### MAIN S ### +#set -eu # enforce abortion if a command is not re + +SCR_SETUP="%create_env.sh: " + +## some first sanity checks +# script is sourced? +if [[ ${BASH_SOURCE[0]} == "${0}" ]]; then + echo "${SCR_SETUP}ERROR: 'create_env.sh' must be sourced, i.e. execute by prompting 'source create_env.sh [virt_env_name]'" exit 1 fi + # from now on, just return if something unexpected occurs instead of exiting # as the latter would close the terminal including logging out if [[ -z "$1" ]]; then - echo "ERROR: Provide a name to set up the virtual environment, i.e. execute by prompting 'source create_env.sh [virt_env_name]" + echo "${SCR_SETUP}ERROR: Provide a name to set up the virtual environment, i.e. execute by prompting 'source create_env.sh [virt_env_name]" return fi # set some variables -SCR_NAME="%create_env.sh:" -HOST_NAME=`hostname` +HOST_NAME=$(hostname) ENV_NAME=$1 -ENV_SETUP_DIR=`pwd` -WORKING_DIR="$(dirname "$ENV_SETUP_DIR")" -EXE_DIR="$(basename "$ENV_SETUP_DIR")" -ENV_DIR_BASE=${WORKING_DIR}/virtual_envs/ -ENV_DIR=${ENV_DIR_BASE}/${ENV_NAME} +SETUP_DIR=$(pwd) +SETUP_DIR_NAME="$(basename "${SETUP_DIR}")" +BASE_DIR="$(dirname "${SETUP_DIR}")" +VENV_DIR="${BASE_DIR}/virtual_envs/${ENV_NAME}" +TF_CONTAINER="${SETUP_DIR}/tensorflow_21.09-tf1-py3.sif" ## perform sanity checks -# Check if singularity is running -if [[ -z "${SINGULARITY_NAME}" ]]; then - echo "${SCR_NAME} ERROR: create_env.sh must be executed in a running singularity on Juwels - in conjuction with container-usage." - echo "${SCR_NAME} Thus, execute 'singularity shell [my_docker_image]' first!" - return -fi - -# further sanity checks: -# * ensure execution from env_setup-directory +# * ensure availability of singularity container +# * check if script is called from env_setup-directory # * check if virtual env has already been set up +# Check if the required TF1.15-container is available + if [[ ! -f "${TF_CONTAINER}" ]]; then + echo "${SCR_SETUP}ERROR: Could not found required TensorFlow 1.15-container under ${TF_CONTAINER}" + return + fi -if [[ "${EXE_DIR}" != "env_setup" ]]; then - echo "${SCR_NAME} ERROR: Execute 'create_env.sh' from the env_setup-subdirectory only!" +# script is called from env_setup-directory? +if [[ "${SETUP_DIR_NAME}" != "env_setup" ]]; then + echo "${SCR_SETUP}ERROR: Execute 'create_env.sh' from the env_setup-subdirectory only!" + echo ${SETUP_DIR_NAME} return fi -if [[ -d ${ENV_DIR} ]]; then - echo "${SCR_NAME} Virtual environment has already been set up under ${ENV_DIR} and is ready to use." +# virtual environment already set-up? +if [[ -d ${VENV_DIR} ]]; then + echo "${SCR_SETUP}Virtual environment has already been set up under ${VENV_DIR} and is ready to use." echo "NOTE: If you wish to set up a new virtual environment, delete the existing one or provide a different name." - ENV_EXIST=1 else ENV_EXIST=0 fi -## check integratability of modules +## check integratability of operating system if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == *jwlogin* ]]; then # unset PYTHONPATH to ensure that system-realted paths are not set (container-environment should be used only) unset PYTHONPATH else - echo "${SCR_NAME} ERROR: Model only runs on HDF-ML and Juwels (Booster) so far." + echo "${SCR_SETUP}ERROR: Model only runs on HDF-ML and Juwels (Booster) so far." return fi ## set up virtual environment if [[ "$ENV_EXIST" == 0 ]]; then - # Activate virtual environment and install additional Python packages. - echo "${SCR_NAME} Configuring and activating virtual environment on ${HOST_NAME}" - - VIRTUAL_ENV_TOOL=${ENV_DIR_BASE}/virtualenv-\*dist-info - if ! ls $VIRTUAL_ENV_TOOL 1> /dev/null 2<&1; then - if [[ ! -d ${ENV_DIR_BASE} ]]; then - mkdir "${ENV_DIR_BASE}" - fi - echo "${SCR_NAME} Install virtualenv in base directory for virtual environments ${ENV_DIR_BASE}" - pip install --target="${ENV_DIR_BASE}" virtualenv - fi - # create virtual environment and install missing required packages - cd "${ENV_DIR_BASE}" - echo "${SCR_NAME} Create and activate virtual environment ${ENV_NAME}..." - python -m virtualenv -p /usr/bin/python --system-site-packages "${ENV_NAME}" - cd - + # Install virtualenv-package and set-up virtual environment with required additional Python packages. + echo "${SCR_SETUP}Configuring and activating virtual environment on ${HOST_NAME}" - activate_virt_env=${ENV_DIR}/bin/activate - source "${activate_virt_env}" - echo "${SCR_NAME} Start installing additional Python modules with pip..." - req_file=${ENV_SETUP_DIR}/requirements_container.txt - pip3 install --no-cache-dir -r "${req_file}" - # expand PYTHONPATH... - export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH >> "${activate_virt_env}" - export PYTHONPATH=${ENV_DIR}/lib/python3.8/site-packages/:$PYTHONPATH >> "${activate_virt_env}" - # ...and ensure that this also done when the - echo "" >> "${activate_virt_env}" - echo "# Expand PYTHONPATH..." >> "${activate_virt_env}" - echo "export PYTHONPATH=${WORKING_DIR}:\$PYTHONPATH" >> "${activate_virt_env}" - echo "export PYTONPATH=${ENV_DIR}/lib/python3.8/site-packages/:\$PYTHONPATH" >> "${activate_virt_env}" + singularity exec --nv "${TF_CONTAINER}" ./install_venv_container.sh "${VENV_DIR}" - if [[ -f "${activate_virt_env}" ]]; then - echo "${SCR_NAME} Virtual environment ${ENV_DIR} has been set up successfully." - # finally, deactivate virtual environment and clean up loaded modules - deactivate - else - echo "${SCR_NAME} ERROR: Cretaion of virtual environment was not successful. Check for preceiding error-messages!" - fi + info_str="Virtual environment ${VENV_DIR} has been set up successfully." +elif [[ "$ENV_EXIST" == 1 ]]; then + # simply activate virtual environment + info_str="Virtual environment ${VENV_DIR} has already been set up before. Nothing to be done." fi + +echo "${SCR_SETUP}${info_str}" +### MAIN E ### diff --git a/downscaling_gan/env_setup/install_venv_container.sh b/downscaling_gan/env_setup/install_venv_container.sh new file mode 100755 index 0000000000000000000000000000000000000000..7f18e748ac1363138de0e1cbe348d7eea2b8a58f --- /dev/null +++ b/downscaling_gan/env_setup/install_venv_container.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +# +# __authors__ = Bing Gong, Michael Langguth +# __date__ = '2021-10-28' +# __last_update__ = '2021-11-18' by Michael Langguth +# +# **************** Description **************** +# This auxiliary script sets up the virtual environment within a singularity container. +# **************** Description **************** + +SCR_INSTALL="%install_venv_container.sh: " # this script +# set some basic variables +BASE_DIR=$(pwd) +VENV_DIR=$1 +VENV_NAME="$(basename "${VENV_DIR}")" +VENV_BASE="$(dirname "${VENV_DIR}")" + +# sanity checks +# check if we are running in a container +if [ -z "${SINGULARITY_NAME}" ]; then + echo "${SCR_INSTALL}ERROR: install_venv_container.sh must be called within a running singularity container." + return +fi + +# check if directory to virtual environment is parsed +if [ -z "$1" ]; then + echo "${SCR_INSTALL}ERROR: Provide a name to set up the virtual environment." + exit +fi + +# check if virtual environment is not already existing +if [ -d "$1" ]; then + echo "${SCR_INSTALL}ERROR: Target directory of virtual environment ${1} already exists. Choose another directory path." + exit +fi + +# check for requirement-file +if [ ! -f "${BASE_DIR}/requirements_container.txt" ]; then + echo "${SCR_INSTALL}ERROR: Cannot find requirement-file ${BASE_DIR}/requirements_container.txt to set up virtual environment." + exit +fi + +# remove dependancies from system packages +export PYTHONPATH= + +# create basic target directory for virtual environment +if ! [[ -d "${VENV_BASE}" ]]; then + mkdir "${VENV_BASE}" + # Install virtualenv in this directory + echo "${SCR_INSTALL}Installing virtualenv under ${VENV_BASE}..." + pip install --target="${VENV_BASE}/" virtualenv + # Change into the base-directory of virtual environments... + cd "${VENV_BASE}" || return +else + # Change into the base-directory of virtual environments... + cd "${VENV_BASE}" || return + if ! python -m virtualenv --version >/dev/null; then + echo "${SCR_INSTALL}ERROR: Base directory for virtual environment exists, but virtualenv-module is unavailable." + exit + fi + echo "${SCR_INSTALL}Virtualenv is already installed." +fi +# ... and set-up virtual environment therein +#python -m virtualenv -p /usr/bin/python "${VENV_NAME}" +python -m virtualenv "${VENV_NAME}" +# Activate virtual environment and install required packages +echo "${SCR_INSTALL}Activating virtual environment ${VENV_NAME} to install required Python modules..." +source "${VENV_DIR}/bin/activate" +# set PYTHONPATH and install packages +export PYTHONPATH="/usr/local/lib/python3.8/dist-packages/" +echo 'export PYTHONPATH="/usr/local/lib/python3.8/dist-packages/"' >> "${VENV_DIR}/bin/activate" +pip install -r "${BASE_DIR}/requirements_container.txt" + +# get back to basic directory +cd "${BASE_DIR}" || exit + + + diff --git a/preprocessing/batch_scripts/preprocess_ifs_hres_data_template.sh b/preprocessing/batch_scripts/preprocess_ifs_hres_data_template.sh new file mode 100644 index 0000000000000000000000000000000000000000..d698e4d74f547abd6acab640c41ed5b749cd9f97 --- /dev/null +++ b/preprocessing/batch_scripts/preprocess_ifs_hres_data_template.sh @@ -0,0 +1,47 @@ +#!/bin/bash -x +#SBATCH --account=deepacf +#SBATCH --nodes=1 +#SBATCH --ntasks=12 +##SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=1 +#SBATCH --output=preprocess_ifs_hres-out_data.%j +#SBATCH --error=preprocess_ifs_hres-err_data.%j +#SBATCH --time=01:00:00 +#SBATCH --gres=gpu:0 +#SBATCH --partition=devel +#SBATCH --mail-type=ALL +#SBATCH --mail-user=m.langguth@fz-juelich.de +##jutil env activate -p cjjsc42 + +######### Template identifier (don't remove) ######### +echo "Do not run the template scripts" +exit 99 +######### Template identifier (don't remove) ######### + +# Name of virtual environment +VIRT_ENV_NAME="venv_juwels" + +# Activate virtual environment if needed (and possible) +if [ -z ${VIRTUAL_ENV} ]; then + if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then + echo "Activating virtual environment..." + source ../${VIRT_ENV_NAME}/bin/activate + else + echo "ERROR: Cannot find requested virtual environment ${VIRT_ENV_NAME}..." + exit 1 + fi +fi +# Loading mouldes +source ../env_setup/modules_preprocess.sh + +# set variables to be parsed +src_dir=/p/scratch/deepacf/maelstrom/maelstrom_data/ifs_hres/orig/ +out_dir=/p/scratch/deepacf/maelstrom/maelstrom_data/ifs_hres/preprocessed/ + +years=( 2016 2017 2018 2019 2020 ) +months=( 4 5 6 7 8 9 ) + +srun python -m mpi4py ../scripts/preprocess_downscaling_data.py -src_dir ${src_dir} -out_dir ${out_dir} \ + -y "${years[@]}" -m "${months[@]}" + + diff --git a/preprocessing/env_setup/create_env.sh b/preprocessing/env_setup/create_env.sh new file mode 100755 index 0000000000000000000000000000000000000000..565470fef9e33fbb00f53a5c53510a719cfd9555 --- /dev/null +++ b/preprocessing/env_setup/create_env.sh @@ -0,0 +1,104 @@ +#!/usr/bin/env bash +# +# __authors__ = Michael Langguth +# __date__ = '2021_08_01' +# +# **************** Description **************** +# This script can be used for setting up the virtual environment needed for +# the downscaling application in scope of the MAELSTROM-project. +# **************** Description **************** +# +### auxiliary-function ### +check_argin() { +# Handle input arguments and check if one is equal to -lcontainer +# Can also be used to check for non-positional arguments (such as -exp_id=*, see commented lines) + for argin in "$@"; do + # if [[ $argin == *"-exp_id="* ]]; then + # exp_id=${argin#"-exp_id="} + if [[ $argin == *"-lcontainer"* ]]; then + bool_container=1 + fi + done + if [[ -z "${bool_container}" ]]; then + bool_container=0 + fi +} +### auxiliary-function ### + +# some first sanity checks +if [[ ${BASH_SOURCE[0]} == ${0} ]]; then + echo "ERROR: 'create_env.sh' must be sourced, i.e. execute by prompting 'source create_env.sh [virt_env_name]'" + exit 1 +fi + +# from now on, just return if something unexpected occurs instead of exiting +# as the latter would close the terminal including logging out +if [[ -z "$1" ]]; then + echo "ERROR: Provide a name to set up the virtual environment, i.e. execute by prompting 'source create_env.sh [virt_env_name]" + return +fi + +# set some variables +SCR_NAME="%create_env.sh:" +HOST_NAME=`hostname` +ENV_NAME=$1 +ENV_SETUP_DIR=`pwd` +WORKING_DIR="$(dirname "$ENV_SETUP_DIR")" +EXE_DIR="$(basename "$ENV_SETUP_DIR")" +ENV_DIR=${WORKING_DIR}/virtual_envs/${ENV_NAME} + +## perform sanity checks + +# * ensure execution from env_setup-directory +# * check if virtual env has already been set up + +if [[ "${EXE_DIR}" != "env_setup" ]]; then + echo "${SCR_NAME} ERROR: Execute 'create_env.sh' from the env_setup-subdirectory only!" + return +fi + +if [[ -d ${ENV_DIR} ]]; then + echo "${SCR_NAME} Virtual environment has already been set up under ${ENV_DIR} and is ready to use." + echo "NOTE: If you wish to set up a new virtual environment, delete the existing one or provide a different name." + + ENV_EXIST=1 +else + ENV_EXIST=0 +fi + +## check integratability of modules +if [[ "${HOST_NAME}" == hdfml* || "${HOST_NAME}" == *jwlogin* ]]; then + source modules_preprocess.sh +else + echo "${SCR_NAME} ERROR: Model only runs on HDF-ML and Juwels (Booster) so far." + return +fi + +## set up virtual environment +if [[ "$ENV_EXIST" == 0 ]]; then + # Activate virtual environment and install additional Python packages. + echo "${SCR_NAME} Configuring and activating virtual environment on ${HOST_NAME}" + + python3 -m venv ${ENV_DIR} + activate_virt_env=${ENV_DIR}/bin/activate + + source "${activate_virt_env}" + echo "${SCR_NAME} Start installing additional Python modules with pip..." + req_file=${ENV_SETUP_DIR}/requirements_preprocessing.txt + pip3 install --no-cache-dir -r "${req_file}" + + # expand PYTHONPATH... + export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH >> "${activate_virt_env}" + export PYTHONPATH=${WORKING_DIR}/utils/:$PYTHONPATH >> "${activate_virt_env}" + # ...and ensure that this also done when the + echo "" >> "${activate_virt_env}" + echo "# Expand PYTHONPATH..." >> "${activate_virt_env}" + echo "export PYTHONPATH=${WORKING_DIR}:\$PYTHONPATH" >> "${activate_virt_env}" + echo "export PYTONPATH=${WORKING_DIR}/../utils/:\$PYTHONPATH" >> "${activate_virt_env}" + + if [[ -f "${activate_virt_env}" ]]; then + echo "${SCR_NAME} Virtual environment ${ENV_DIR} has been set up successfully." + else + echo "${SCR_NAME} ERROR: Creation of virtual environment was not successful. Check for preceiding error-messages!" + fi +fi diff --git a/preprocessing/env_setup/modules_preprocess.sh b/preprocessing/env_setup/modules_preprocess.sh new file mode 100755 index 0000000000000000000000000000000000000000..cdf4e0003c22062bfbd2f565c52fcbb6a36a4c96 --- /dev/null +++ b/preprocessing/env_setup/modules_preprocess.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +# __author__ = Michael Langguth +# __date__ = '2020_08_01' + +# This script loads the required modules for the preprocessing of IFS HRES data in scope of the +# downscaling application in scope of the MAELSTROM project on Juwels and HDF-ML. +# Note that some other packages have to be installed into a venv (see create_env.sh and requirements_preprocess.txt). + +SCR_NAME_MOD="modules_preprocess.sh" +HOST_NAME=`hostname` + +echo "%${SCR_NAME_MOD}: Start loading modules on ${HOST_NAME} required for preprocessing IFS HRES data." + +ml purge +ml use $OTHERSTAGES +ml Stages/2020 + +ml GCC/9.3.0 +ml GCCcore/.9.3.0 +ml ParaStationMPI/5.4.7-1 +ml CDO/1.9.8 +ml NCO/4.9.5 +ml mpi4py/3.0.3-Python-3.8.5 +ml SciPy-Stack/2020-Python-3.8.5 +ml dask/2.22.0-Python-3.8.5 +ml TensorFlow/2.3.1-Python-3.8.5 + diff --git a/preprocessing/env_setup/requirements_preprocessing.txt b/preprocessing/env_setup/requirements_preprocessing.txt new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/preprocessing/grid_des/ifs_hres_coarsened_grid b/preprocessing/grid_des/ifs_hres_coarsened_grid new file mode 100644 index 0000000000000000000000000000000000000000..d856f0227fee6ad59b1d15f3d2ad685dad000da4 --- /dev/null +++ b/preprocessing/grid_des/ifs_hres_coarsened_grid @@ -0,0 +1,7 @@ +gridtype = lonlat +xsize = 18 +ysize = 14 +xfirst = 3.55 +xinc = 0.8 +yfirst = 54.85 +yinc = -0.8 diff --git a/preprocessing/grid_des/ifs_hres_grid_base b/preprocessing/grid_des/ifs_hres_grid_base new file mode 100644 index 0000000000000000000000000000000000000000..3dd0a4566df7ad99ea15f5d6776ce8bef6dc6659 --- /dev/null +++ b/preprocessing/grid_des/ifs_hres_grid_base @@ -0,0 +1,7 @@ +gridtype = lonlat +xsize = 144 +ysize = 112 +xfirst = 3.2 +xinc = 0.1 +yfirst = 55.3 +yinc = -0.1 diff --git a/preprocessing/grid_des/ifs_hres_grid_tar b/preprocessing/grid_des/ifs_hres_grid_tar new file mode 100644 index 0000000000000000000000000000000000000000..84a0e149ef2887c300a5de28d6c13c88ca9c9791 --- /dev/null +++ b/preprocessing/grid_des/ifs_hres_grid_tar @@ -0,0 +1,7 @@ +gridtype = lonlat +xsize = 128 +ysize = 96 +xfirst = 4. +xinc = 0.1 +yfirst = 54.5 +yinc = -0.1 diff --git a/preprocessing/scripts/coarsen_ifs_hres.sh b/preprocessing/scripts/coarsen_ifs_hres.sh new file mode 100755 index 0000000000000000000000000000000000000000..1347d9020a86e1d8840d8bb321d8480aa5371cea --- /dev/null +++ b/preprocessing/scripts/coarsen_ifs_hres.sh @@ -0,0 +1,122 @@ +#!/usr/bin/env bash + +# __author__ = Michael Langguth +# __date__ = '2021_07_23' + +# This script creates the input for the downscaling task tackled wit a Unet in the scope of the MAELSTROM project. +# The script requires that the netCDF-tools ncap2 and ncrea are available. Besides, CDO is required for remapping the data. +# For first-order conservative remapping, CDO must be installed with PROJ. + +# basic variables +scr_name="%coarsen_ifs_hres.sh" + +HOST_NAME=`hostname` + +# grid descriptions +fine_grid_base_dscr="../grid_des/ifs_hres_grid_base" +fine_grid_tar_dscr="../grid_des/ifs_hres_grid_tar" +coarse_grid_dscr="../grid_des/ifs_hres_coarsened_grid" + +# start and end coordinates of target domain (must correspond to grid descriptions!) +lon0="3.2" +lon1="17.5" +lat0="44.2" +lat1="55.3" +dx="0.8" + +# some IFS-specific parameters (obtained from Chapter 12 in http://dx.doi.org/10.21957/efyk72kl) +cpd="1004.709" +g="9.80665" + +### some sanity checks ### +if [[ ! -n "$1" ]]; then + echo "${scr_name}: Pass a path to a netCDF-file to script." +fi + +filename=$1 +filename_base="${filename%.*}" + +if ! command -v cdo &> /dev/null; then + echo "${scr_name}: CDO is not available on ${HOST_NAME}"; + return 1 +fi + +if ! command -v ncap2 &> /dev/null; then + echo "${scr_name}: ncap2 is not available on ${HOST_NAME}"; + return 1 +fi + +if ! command -v ncea &> /dev/null; then + echo "${scr_name}: ncea is not available on ${HOST_NAME}"; + return 1 +fi + +if [[ ! -f "$1" ]]; then + echo "${scr_name}: The file $1 does not exist."; + return 1 +fi + +if [[ ! -f ${fine_grid_base_dscr} ]]; then + echo "${scr_name}: ERROR: The basic grid description for IFS HRES ${fine_grid_base_dscr} is missing." + return 1 +fi + +if [[ ! -f ${coarse_grid_dscr} ]]; then + echo "${scr_name}: ERROR: The grid description for the coarse grid ${coarse_grid_dscr} is missing." + return 1 +fi + +if [[ ! -f ${fine_grid_tar_dscr} ]]; then + echo "${scr_name}: ERROR: The target grid description for IFS HRES ${fine_grid_tar_dscr} is missing." + return 1 +fi + +#return 0 + +### Start the work ### + +# shrink data to region of interest +filename_sd="${filename_base}_subdom.nc" +ncea -O -d time,0 -d latitude,${lat0},${lat1} -d longitude,${lon0},${lon1} $filename $filename_sd +ncrename -d latitude,lat -v latitude,lat -d longitude,lon -v longitude,lon ${filename_sd} +ncap2 -O -s "lat=double(lat); lon=double(lon)" ${filename_sd} ${filename_sd} + +# reset coordinates for later slicing +lat0="45.0" +lat1="54.5" +lon0="4.0" +lon1="16.71" + +# calculate dry static energy for first-order conservative remapping +filename_dse="${filename_base}_dse.nc" +ncap2 -O -s "s=${cpd}*t2m + z + ${g}*2" -v ${filename_sd} ${filename_dse} + +# add surface geopotential to file +ncks -A -v z $filename_sd $filename_dse + +# remap the data (first-order conservative approach) +filename_crs="${filename_base}_coarse.nc" +cdo remapcon,${coarse_grid_dscr} -setgrid,${fine_grid_base_dscr} ${filename_dse} ${filename_crs} + +# remap with extrapolation on the target high resoved grid with bilinear rempapping +filename_remapped="${filename_base}_remapped.nc" +cdo remapbil,${fine_grid_tar_dscr} -setgrid,${coarse_grid_dscr} ${filename_crs} ${filename_remapped} + +# retransform dry static energy to t2m +ncap2 -O -s "t2m_in=(s-z-${g}*2)/${cpd}" -o ${filename_remapped} ${filename_remapped} +# finally rename data to distinguish between input and target data (the later must be copied over from previous files) +ncrename -v z,z_in ${filename_remapped} +ncks -O -x -v s ${filename_remapped} ${filename_remapped} +ncea -A -d lat,${lat0},${lat1} -d lon,${lon0},${lon1} -v t2m,z ${filename_sd} ${filename_remapped} +ncrename -v t2m,t2m_tar -v z,z_tar ${filename_remapped} + +### Return and clean-up in case of success ### +if [[ -f ${filename_remapped} ]]; then + echo "${scr_name}: Processed data successfully from ${filename} to ${filename_remapped}. Cleaning-up..." + rm $filename_sd $filename_dse $filename_crs + exit 0 +else + echo "${scr_name}: Something went wrong when processing ${filename}. Check intermediate files." + exit 1 +fi + diff --git a/preprocessing/scripts/preprocess_downscaling_data.py b/preprocessing/scripts/preprocess_downscaling_data.py new file mode 100755 index 0000000000000000000000000000000000000000..04d24589daa551fa8bcd130a627fbd0e856e437c --- /dev/null +++ b/preprocessing/scripts/preprocess_downscaling_data.py @@ -0,0 +1,153 @@ +# ********** Info ********** +# @Creation: 2021-08-01 +# @Update: 2021-08-01 +# @Author: Michael Langguth +# @Site: Juelich supercomputing Centre (JSC) @ FZJ +# @File: preproces_downscaling_data.py +# ********** Info ********** + +# doc-string +""" +Main script to preprocess IFS HRES data for downscaling with UNet-architecture. +""" +# doc-string + +import os, glob +import shutil +import argparse +import logging +import subprocess as sp +import datetime as dt +from tfrecords_utils import IFS2TFRecords +from pystager_utils import PyStager + +scr_name = "preprocess_downsclaing_data" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--source_parent_dir", "-src_dir", dest="src_dir", type=str, + default="/p/scratch/deepacf/maelstrom/maelstrom_data/ifs_hres", + help="Top-level directory under which IFS HRES data are stored with subdirectories " + + "<year>/<month>.") + parser.add_argument("--out_parent_dir", "-out_dir", dest="out_dir", type=str, required=True, + help="Top-level directory under which remapped data will be stored.") + parser.add_argument("--years", "-y", dest="years", type=int, nargs="+", default=[2016, 2017, 2018, 2019, 2020], + help="Years of data to be preprocessed.") + parser.add_argument("--months", "-m", dest="months", type=int, nargs="+", default=range(3, 10), + help="Months of data to be preprocessed.") + + args = parser.parse_args() + dir_in = args.src_dir + dir_out = args.out_dir + years = args.years + months = args.months + + if not os.path.isdir(dir_in): + raise NotADirectoryError("%{0}: Parsed source directory does not exist.".format(scr_name)) + + if not os.path.isdir(dir_out): + os.makedirs(dir_out, exist_ok=True) + print("%{0}: Created output-directory for remapped data '{1}'".format(scr_name, dir_out)) + + ifs_hres_pystager = PyStager(preprocess_worker, "year_month_list", nmax_warn=3) + ifs_hres_pystager.setup(years, months) + ifs_hres_pystager.run(dir_in, dir_out) + + +def preprocess_worker(year_months: list, dir_in: str, dir_out: str, logger: logging.Logger, + nmax_warn: int = 3, hour: int = None): + """ + Function that runs job of an individual worker. + :param year_months: Datetime-objdect indicating year and month for which data should be preprocessed + :param dir_in: Top-level input directory for original IFS HRED netCDF-files + :param dir_out: Top-level output directory wheer netCDF-files and TFRecords of remapped data will be stored + :param logger: Logging instance for log process on worker + :param nmax_warn: allowed maximum number of warnings/problems met during processing (default:3) + :param hour: hour of the dy for which data should be preprocessed (default: None) + :return: number of warnings/problems met during processing (if they do not trigger an error) + """ + method = preprocess_worker.__name__ + + nwarns = 0 + this_dir = os.path.dirname(os.path.realpath(__file__)) + + for year_month in year_months: + assert isinstance(year_month, dt.datetime),\ + "%{0}: All year_months-argument must be a datetime-object. Current one is of type '{1}'"\ + .format(method, type(year_month)) + + year, month = int(year_month.strftime("%Y")), int(year_month.strftime("%m")) + year_str, month_str = str(year), "{0:02d}".format(int(month)) + hh_str = "*[0-2][0-9]" if hour is None else "{0:02d}".format(int(hour)) + + subdir = year_month.strftime("%Y-%m") + dirr_curr = os.path.join(dir_in, str(year), subdir) + dest_nc_dir = os.path.join(dir_out, "netcdf_data", year_str, subdir) + os.makedirs(dest_nc_dir, exist_ok=True) + + assert isinstance(logger, logging.Logger), "%{0}: logger-argument must be a logging.Logger instance"\ + .format(method) + + if not os.path.isdir(dirr_curr): + err_mess = "%{0}: Could not find directory '{1}'".format(method, dirr_curr) + logger.critical(err_mess) + raise NotADirectoryError(err_mess) + + search_patt = os.path.join(dirr_curr, "sfc_{0}{1}*_{2}.nc".format(year_str, month_str, hh_str)) + logger.info("%{0}: Serach for netCDF-files under '{1}' for year {2}, month {3} and hour {4}" + .format(method, dirr_curr, year_str, month_str, hh_str)) + nc_files = glob.glob(search_patt) + + if not nc_files: + err_mess = "%{0}: Could not find any netCDF-file in '{1}' with search pattern '{2}'"\ + .format(method, dirr_curr, search_patt) + logger.critical(err_mess) + raise FileNotFoundError(err_mess) + + nfiles = len(nc_files) + logger.info("%{0}: Found {1:d} files under '{2}' for preprocessing.".format(method, nfiles, dirr_curr)) + nwarns = 0 + # Perform remapping + for i, nc_file in enumerate(nc_files): + logger.info("%{0}: Start remapping of data from file '{1}' ({2:d}/{3:d})" + .format(method, nc_file, i+1, nfiles)) + cmd = "{0} {1}".format(os.path.join(this_dir, "coarsen_ifs_hres.sh"), nc_file) + try: + #logger.info("%{0}: Processing of netCDF-files already done.".format(method)) + _ = sp.check_output(cmd, shell=True) + nc_file_new = os.path.basename(nc_file).replace(".nc", "_remapped.nc") + shutil.move(nc_file.replace(".nc", "_remapped.nc"), os.path.join(dest_nc_dir, nc_file_new)) + logger.info("%{0} Data has been remapped successfully and moved to '{1}'-directory." + .format(method, dest_nc_dir)) + except Exception as err: + nwarns += 1 + logger.debug("%{0}: A problem was faced when handling file '{1}'.".format(method, nc_file) + + "Remapping of this file presumably failed.") + if nwarns > nmax_warn: + logger.critical("%{0}: More warnings triggered than allowed ({1:d}). ".format(method, nmax_warn) + + "Job will be trerminated and see error below.") + raise err + else: + pass + + # move remapped data to own directory + tfr_data_dir = os.path.join(dir_out, "tfr_data") + ifs_tfr = IFS2TFRecords(tfr_data_dir, os.path.join(dest_nc_dir, os.path.basename(nc_files[0]) + .replace(".nc", "_remapped.nc"))) + ifs_tfr.get_and_write_metadata() + logger.info("%{0}: IFS2TFRecords-class instance has been initalized successully.".format(method)) + try: + ifs_tfr.write_monthly_data_to_tfr(dest_nc_dir, patt="*remapped.nc") + except Exception as err: + logger.critical("%{0}: Error when writing TFRecord-file. Investigate error-message below.".format(method)) + raise err + + logger.info("%{0}: TFRecord-files have been created succesfully under '{1}'".format(method, tfr_data_dir)) + logger.info("%{0}: During processing {1:d} warnings have been faced.".format(method, nwarns)) + + return nwarns + + +if __name__ == "__main__": + main() diff --git a/utils/helper_utils.py b/utils/helper_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..aba57f9dc6c9ab6af92071b7f90792bc7106184f --- /dev/null +++ b/utils/helper_utils.py @@ -0,0 +1,85 @@ +# ********** Info ********** +# @Creation: 2021-07-28 +# @Update: 2021-07-30 +# @Author: Michael Langguth +# @Site: Juelich supercomputing Centre (JSC) @ FZJ +# @File: helper.py +# ********** Info ********** + +""" +A collection of auxiliary functions. + +The following functions are provided: + * ensure_datetime + * extract_date + * subset_files_on_date +""" + +import os +import numpy as np +import pandas as pd +import datetime as dt +from dateutil.parser import parse as date_parser + + +def ensure_datetime(date): + """ + Tries to convert date which can be everything that can be processed by pandas' to_datetime-method + into a datetime.datetime-object. + :param date: Any date that can be handled by to_datetime + :param: Same as date, but as datetime.datetime-onject + """ + method = ensure_datetime.__name__ + + if isinstance(date, dt.datetime): + date_dt = date + else: + try: + date_dt = pd.to_datetime(date).to_pydatetime() + except Exception as err: + print("%{0}: Could not handle input date (as string: {1}, type: {2}).".format(method, str(date), type(date))) + raise err + + return date_dt + + +def extract_date(date_str): + """ + Checks if a datetime-object can be extracted from a given string. + Based on dateutil.parser.parse. + :param date_str: Any string containing some date + :return: A corresponding datetime object + """ + method = extract_date.__name__ + + assert isinstance(date_str, str), "Input must be a string." + try: + date_extracted = date_parser(date_str, fuzzy=True) + except Exception as err: + print("%{0}: Could not extract date from '{1}'. Investigate raised error".format(method, date_str)) + raise err + return date_extracted + + +def subset_files_on_date(all_files_list: list, val: int, filter_basedir: bool = False, date_alias: str = "H"): + """ + Subsets a list of files based on a time-pattern that must be part of the filename. + :param all_files_list: list of all files + :param val: time value (default meaning: hour of the day, see date_alias) + :param filter_basedir: flag for removing base-directory when subsetting, e.g. when dates are present in basedir + :param date_alias: also known as offset alias in pandas + (see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases) + """ + method = subset_files_on_date.__name__ + + if filter_basedir: + all_files_dates = [(extract_date(os.path.dirname(dfile))).strftime(date_alias) for dfile in all_files_list] + else: + all_files_dates = [(extract_date(dfile)).strftime(date_alias) for dfile in all_files_list] + inds = [idx for idx, s in enumerate(all_files_dates) if "{0:02d}".format(int(val)) in s] + + if not inds: + raise ValueError("%{0}: Could not find any file carrying the value of {1:02d} with date alias {2}" + .format(method, val, date_alias)) + else: + return list(np.asarray(all_files_list)[inds]) \ No newline at end of file diff --git a/utils/pystager_utils.py b/utils/pystager_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..5bd70f3c976865e0244f7c9b24f58e0404eb0db0 --- /dev/null +++ b/utils/pystager_utils.py @@ -0,0 +1,482 @@ +# ********** Info ********** +# @Creation: 2021-07-25 +# @Update: 2021-07-27 +# @Author: Michael Langguth, based on work by Amirpasha Mozaffari +# @Site: Juelich supercomputing Centre (JSC) @ FZJ +# @File: pystager_utils.py +# ********** Info ********** + +import sys, os +# The script must be executed with the mpi4py-module to ensure that the job gets aborted when an error is risen +# see https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html +if "mpi4py" not in sys.modules: + raise ModuleNotFoundError("Python-script must be called with the mpi4py module, i.e. 'python -m mpi4py <...>.py") +import multiprocessing +import subprocess +import inspect +from typing import Union, List +from mpi4py import MPI +import logging +import numpy as np +import pandas as pd +import datetime as dt +import platform + + +class Distributor(object): + """ + Class for defining (customized) distributors. The distributor selected by the distributor_engine must provide + the dynamical arguments for a parallelized job run by PyStager (see below) which inherits from this class. + """ + class_name = "Distributor" + + def __init__(self, distributor_name): + self.distributor_name = distributor_name + + def distributor_engine(self, distributor_name: str): + """ + Sets up distributor for organinzing parallelization. + :param distributor_name: Name of distributor + :return distributor: selected callable distributor + """ + method = "{0}->{1}".format(Distributor.class_name, Distributor.distributor_engine.__name__) + + if distributor_name.lower() == "date": + distributor = self.distributor_date + elif distributor_name.lower() == "year_month_list": + distributor = self.distributor_year_month + else: + raise ValueError("%{0}: The distributor named {1} is not implemented yet.".format(method, distributor_name)) + + return distributor + + def distributor_date(self, date_start: dt.datetime, date_end: dt.datetime, freq: str = "1D"): + """ + Creates a transfer dictionary whose elements are lists for individual start and end dates for each processor + param date_start: first date to convert + param date_end: last date to convert + return: transfer_dictionary allowing date-based parallelization + """ + method = "{0}->{1}".format(Distributor.class_name, Distributor.distributor_date.__name__) + + # sanity checks + if not (isinstance(date_start, dt.datetime) and isinstance(date_end, dt.datetime)): + raise ValueError("%{0}: date_start and date_end have to datetime objects!".format(method)) + + if not (date_start.strftime("%H") == "00" and date_end.strftime("%H") == "00"): + raise ValueError("%{0}: date_start and date_end must be valid at 00 UTC.".format(method)) + + if not int((date_end - date_start).days) >= 1: + raise ValueError("%{0}: date_end must be at least one day after date_start.".format(method)) + + if not hasattr(self, "num_processes"): + print("%{0}: WARNING: Attribute num_processes is not set and thus no parallelization will take place.") + num_processes = 2 + else: + num_processes = self.num_processes + + # init transfer dictionary (function create_transfer_dict_from_list does not work here since transfer dictionary + # consists of tuple with start and end-date instead of a number of elements) + transfer_dict = dict.fromkeys(list(range(1, num_processes))) + + dates_req_all = pd.date_range(date_start, date_end, freq=freq) + ndates = len(dates_req_all) + ndates_per_node = int(np.ceil(float(ndates)/(num_processes-1))) + + for node in np.arange(num_processes): + ind_max = np.minimum((node+1)*ndates_per_node-1, ndates-1) + transfer_dict[node+1] = [dates_req_all[node*ndates_per_node], + dates_req_all[ind_max]] + if ndates-1 == ind_max: + break + + return transfer_dict + + def distributor_year_month(self, years, months): + + method = "{0}->{1}".format(Distributor.class_name, Distributor.distributor_year_month.__name__) + + list_or_int = Union[List, int] + + assert isinstance(years, list_or_int.__args__), "%{0}: Input years must be list of years or an integer."\ + .format(method) + assert isinstance(months, list_or_int.__args__), "%{0}: Input months must be list of months or an integer."\ + .format(method) + + if not hasattr(self, "num_processes"): + print("%{0}: WARNING: Attribute num_processes is not set and thus no parallelization will take place.") + num_processes = 2 + else: + num_processes = self.num_processes + + if isinstance(years, int): years = [years] + if isinstance(months, int): months = [months] + + all_years_months = [] + for year in years: + for month in months: + all_years_months.append(dt.datetime.strptime("{0:d}-{1:02d}".format(int(year), int(month)), "%Y-%m")) + + transfer_dict = Distributor.create_transfer_dict_from_list(all_years_months, num_processes) + + return transfer_dict + + @staticmethod + def create_transfer_dict_from_list(in_list: List, num_procs: int): + """ + Splits up list to transfer dictionary for PyStager. + :param in_list: list of elements that can be distributed to workers + :param num_procs: Number of workers that are avaialable to operate on elements of list + :return: transfer dictionary for PyStager + """ + + method = Distributor.create_transfer_dict_from_list.__name__ + + assert isinstance(in_list, list), "%{0} Input argument in_list must be a list, but is of type '{1}'."\ + .format(method, type(in_list)) + + assert int(num_procs) >= 2, "%{0}: Number of processes must be at least two.".format(method) + + nelements = len(in_list) + nelements_per_node = int(np.ceil(float(nelements)/(num_procs-1))) + + transfer_dict = dict.fromkeys(list(range(1, num_procs))) + + for i, element in enumerate(in_list): + ind = i % (num_procs-1) + 1 + if i < num_procs: + transfer_dict[ind] = [element] + else: + transfer_dict[ind].append(element) + + print(transfer_dict) + + return transfer_dict + + +class PyStager(Distributor): + """ + Organizes parallelized execution of a job. + The job must be wrapped into a function-object that can be fed with dynamical arguments provided by an available + distributor and static arguments (see below). + Running PyStager constitutes a three-step approach. First PyStager must be instanciated, then it must be set-up by + calling the setup-method and finally, the job gets executed in a parallelized manner. + Example: Let the function 'process_data' be capable to process hourly data files between date_start and date_end. + Thus, parallelization can be organized with distributor_date which only must be fed with a start and end + date (the freq-argument is optional and defaults to "1D" -> daily frequency (see pandas)). + With the data being stored under <data_dir>, PyStager can be called in a Python-script by: + pystager_obj = PyStager(process_data, "date") + pystager_obj.setup(<start_datetime_obj>, <end_datetime_obj>, freq="1H") + pystager_obj.run(<static_arguments>) + By splitting up the setup-method from the execution, multiple job executions becomes possible. + """ + + class_name = "PyStager" + + def __init__(self, job_func: callable, distributor_name: str, nmax_warn: int = 3, logdir: str = None): + """ + Initialize PyStager. + :param job_func: Function whose execution is meant to be parallelized. This function must accept arguments + dynamical arguments provided by the distributor (see distributo_engine-method) and + static arguments (see run-method) in the order mentioned here. Additionally, it must accept + a logger instance. The argument 'nmax_warn' is optional. + :param distributor_name: Name of distributor which takes care for the paralelization (see distributo_engine + -method) + :param nmax_warn: Maximal number of accepted warnings during job execution (default: 3) + :param logdir: directory where logfile are stored (current working directory becomes the default if not set) + """ + super().__init__(distributor_name) + method = PyStager.__init__.__name__ + + self.cpu_name = platform.processor() + self.num_cpus_max = multiprocessing.cpu_count() + self.distributor = self.distributor_engine(distributor_name) + self.logdir = PyStager.set_and_check_logdir(logdir, distributor_name) + self.nmax_warn = int(nmax_warn) + self.job = job_func + self.transfer_dict = None + self.comm = MPI.COMM_WORLD + self.my_rank = self.comm.Get_rank() + self.num_processes = self.comm.Get_size() + + if not callable(self.job): + raise ValueError("%{0}: Passed function to be parallelized must be a callable function for {1}." + .format(method, PyStager.class_name)) + + if self.nmax_warn <= 0: + raise ValueError("%{0}: nmax_warn must be larger than zero for {1}, but is set to {2:d}" + .format(method, PyStager.class_name, self.nmax_warn)) + + if self.num_processes < 2: + raise ValueError("%{0}: Number of assigned MPI processes must be at least two for {1}." + .format(method, PyStager.class_name)) + + def setup(self, *args): + """ + Simply passes arguments to initialized distributor. + *args : Tuple of arguments suitable for distributor (self.distributor) + """ + method = PyStager.setup.__name__ + + if self.my_rank == 0: + try: + self.transfer_dict = self.distributor(*args) + except Exception as err: + print("%{0}: Failed to set up transfer dictionary of PyStager (see raised error below)".format(method)) + raise err + else: + pass + + # def run(self, data_dir, *args, job_name="dummy"): + def run(self, *args, job_name="dummy"): + """ + Run PyStager. + """ + method = "{0}->{1}".format(PyStager.class_name, PyStager.run.__name__) + + if self.my_rank == 0 and self.transfer_dict is None: + raise AttributeError("%{0}: transfer_dict is still None. Call setup beforehand!".format(method)) + + # if not os.path.isdir(data_dir): + # raise NotADirectoryError("%{0}: The passed data directory '{1}' does not exist.".format(method, data_dir)) + + if self.my_rank == 0: + logger_main = os.path.join(self.logdir, "{0}_job_main.log".format(job_name)) + if os.path.exists(logger_main): + print("%{0}: Main logger file '{1}' already existed and was deleted.".format(method, logger_main)) + os.remove(logger_main) + + logging.basicConfig(filename=logger_main, level=logging.DEBUG, + format="%(asctime)s:%(levelname)s:%(message)s") + logger = logging.getLogger(__file__) + logger.addHandler(logging.StreamHandler(sys.stdout)) + + logger.info("PyStager is started at {0}".format(dt.datetime.now().strftime("%Y-%m%-d %H:%M:%S UTC"))) + + # distribute work to worker processes + for proc in range(1, self.num_processes): + broadcast_list = self.transfer_dict[proc] + self.comm.send(broadcast_list, dest=proc) + + stat_mpi = self.manage_recv_mess(logger) + + if stat_mpi: + logger.info("Job has been executed successfully on {0:d} worker processes. Job exists normally at {1}" + .format(self.num_processes, dt.datetime.now().strftime("%Y-%m%-d %H:%M:%S UTC"))) + else: + # worker logger file + logger_worker = os.path.join(self.logdir, "{0}_job_worker_{1}.log".format(job_name, self.my_rank)) + if os.path.exists(logger_worker): + os.remove(logger_worker) + + logging.basicConfig(filename=logger_worker, level=logging.DEBUG, + format='%(asctime)s:%(levelname)s:%(message)s') + logger = logging.getLogger(__file__) + logger.addHandler(logging.StreamHandler(sys.stdout)) + logger.info("==============Worker logger is activated ==============") + logger.info("Start receiving message from master...") + + stat_worker = self.manage_worker_jobs(logger, *args) + + MPI.Finalize() + + def manage_recv_mess(self, logger): + """ + Manages received messages from worker processes. Also accumulates warnings and aborts job if maximum number is + exceeded + :param logger: logger instance to add logs according to received message from worker + :return stat: True if ok, else False + """ + method = "{0}->{1}".format(PyStager.class_name, PyStager.manage_recv_mess.__name__) + + assert isinstance(self.comm, MPI.Intracomm), "%{0}: comm must be a MPI Intracomm-instance, but is type '{1}'"\ + .format(method, type(self.comm)) + + assert isinstance(logger, logging.Logger), "%{0}: logger must be a Logger-instance, but is of type '{1}'"\ + .format(method, type(logger)) + + message_counter = 1 + warn_counter = 0 + lexit = False + while message_counter < self.num_processes: + mess_in = self.comm.recv() + worker_stat = mess_in[0][:5] + worker_num = mess_in[0][5:7] + worker_str = "Worker with ID {0}".format(worker_num) + # check worker status + if worker_stat == "IDLEE": + logger.info("{0} is idle. Nothing to do.".format(worker_str)) + elif worker_stat == "PASSS": + logger.info("{0} has finished the job successfully".format(worker_str)) + elif worker_stat == "WARNN": + warn_counter += int(mess_in[1]) + logger.warning("{0} has seen a non-fatal error/warning. Number of marnings is now {1:d}" + .format(worker_str, warn_counter)) + elif worker_stat == "ERROR": + logger.critical("{0} met a fatal error. System will be terminated".format(worker_str)) + lexit = True + else: + logger.critical("{0} has sent an unknown message: '{1}'. System will be terminated." + .format(method, worker_stat)) + lexit = True + # sum of warnings exceeds allowed maximum + if warn_counter > self.nmax_warn: + logger.critical("Number of allowed warnings exceeded. Job will be terminated...") + lexit = True + + if lexit: + logger.critical("Job is shut down now.") + raise RuntimeError("%{0}: Distributed jobs could not be run properly.".format(method)) + + message_counter += 1 + + return True + + def manage_worker_jobs(self, logger, *args): + """ + Manages worker processes and runs job with passed arguments. + Receives from master process and returns a tuple of a return-message and a worker status. + :param logger: logger instance to add logs according to received message from master and from parallelized job + :param args: the arguments passed to parallelized job (see self.job in __init__) + :return stat: True if ok, else False + """ + method = "{0}->{1}".format(PyStager.class_name, PyStager.manage_worker_jobs.__name__) + + worker_stat_fail = 9999 + + # sanity checks + assert isinstance(self.comm, MPI.Intracomm), "%{0}: comm must be a MPI Intracomm-instance, but is type '{1}'"\ + .format(method, type(self.comm)) + + assert isinstance(logger, logging.Logger), "%{0}: logger must be a Logger-instance, but is of type '{1}'"\ + .format(method, type(logger)) + + mess_in = self.comm.recv() + + if mess_in is None: + mess_out = ("IDLEE{0}: Worker {1} is idle".format(self.my_rank, self.my_rank), 0) + logger.info(mess_out) + logger.info("Thus, nothing to do. Job is terminated locally on rank {0}".format(self.my_rank)) + self.comm.send(mess_out, dest=0) + return True + else: + logger.info("Worker {0} received input message: {1}".format(self.my_rank, mess_in[0])) + if "nmax_warn" in inspect.getfullargspec(self.job).args: + worker_stat = self.job(mess_in, *args, logger, nmax_warn=self.nmax_warn) + else: + worker_stat = self.job(mess_in, *args, logger) + + + err_mess = None + if worker_stat == -1: + mess_out = ("ERROR{0}: Failure was triggered.".format(self.my_rank), worker_stat_fail) + logger.critical("Progress was unsuccessful due to a fatal error observed." + + " Worker{0} triggers termination of all jobs.".format(self.my_rank)) + err_mess = "Worker{0} met a fatal error. Cannot continue...".format(self.my_rank) + elif worker_stat == 0: + logger.debug('Progress was successful') + mess_out = ("PASSS{0}:was finished".format(self.my_rank), worker_stat) + logger.info('Worker {0} finished a task'.format(self.my_rank)) + elif worker_stat > 0: + logger.debug("Progress faced {0:d} warnings which is still acceptable,".format(int(worker_stat)) + + " but requires investigation of the processed dataset.") + mess_out = ("WARNN{0}: Several warnings ({1:d}) have been triggered " + .format(self.my_rank, worker_stat), worker_stat) + logger.warning("Worker {0} has met relevant warnings, but still could continue...".format(self.my_rank)) + else: + mess_out = ("ERROR{0}: Unknown worker status ({1:d}) received ".format(self.my_rank, worker_stat), + worker_stat_fail) + err_mess = "Worker {0} has produced unknown worker status and triggers termination of all jobs."\ + .format(self.my_rank) + logger.critical(err_mess) + # communicate to master process + self.comm.send(mess_out, dest=0) + + if err_mess: + return False + else: + return True + + @staticmethod + def set_and_check_logdir(logdir, distributor_name): + """ + Sets and checks logging directory. + :param logdir: parent directory where log-files will be stored + :param distributor_name: name of distributor-method (used for naming actual log-directory) + :return logdir: adjusted log-directory + """ + method = PyStager.set_and_check_logdir.__name__ + + if logdir is None: + logdir = os.path.join(os.getcwd(), "pystager_log_{0}".format(distributor_name)) + os.makedirs(logdir, exist_ok=True) + print("%{0}: Default log directory '{1}' is used.".format(method, logdir)) + else: + if not os.path.isdir(logdir): + try: + os.mkdir(logdir) + print("%{0}: Created log directory '{1}'".format(method, logdir)) + except Exception as err: + print("%{0}: Failed to create desired log directory '{1}'".format(method, logdir)) + raise Exception + else: + print("%{0}: Directory '{1}' is used as log directory.".format(method, logdir)) + + return logdir + + @staticmethod + def directory_scanner(source_path, lprint=True): + """ + Scans through directory and returns a couple of information. + NOTE: Subdirectories under source_path are not recursively scanned + :param source_path: Input idrectory to scan + :param lprint: Boolean if info should be printed (default: True) + :return dir_info: dictionary containing info on scanned directory with the following keys + "dir_detail_list": overview on number of files and required memory + "sub_dir_list": list of subsirectories + "total_size_source": total meory under source_path + "total_num_files": total number of files under source_path + "total_num_directories": total number of directories under source_path + """ + method = PyStager.directory_scanner.__name__ + + dir_detail_list = [] # directories details + sub_dir_list = [] + total_size_source = 0 + total_num_files = 0 + + if not os.path.isdir(source_path): + raise NotADirectoryError("%{0}: The directory '{1}' does not exist.".format(method, source_path)) + + list_directories = os.listdir(source_path) + + for d in list_directories: + path = os.path.join(source_path, d) + if os.path.isdir(path): + sub_dir_list.append(d) + sub_dir_list.sort() + # size of the files and subdirectories + size_dir = subprocess.check_output(['du', '-sc', path]) + splitted = size_dir.split() # fist item is the size of the folder + size = (splitted[0]) + num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) + dir_detail_list.extend([d, size, num_files]) + total_num_files = total_num_files + int(num_files) + total_size_source = total_size_source + int(size) + else: + raise NotADirectoryError("%{0}: '{1}' does not exist".format(method, path)) + + total_num_directories = int(len(list_directories)) + total_size_source = float(total_size_source / 1000000) + + if lprint: + print("===== Info from %{0}: =====".format(method)) + print("* Total memory size of the source directory: {0:.2f}Gb.".format(total_size_source)) + print("Total number of the files in the source directory: {0:d} ".format(num_files)) + print("Total number of the directories in the source directory: {0:d} ".format(total_num_directories)) + + dir_info = {"dir_detail_list": dir_detail_list, "sub_dir_list": sub_dir_list, + "total_size_source": total_size_source, "total_num_files": total_num_files, + "total_num_directories": total_num_directories} + + return dir_info diff --git a/utils/tfrecords_utils.py b/utils/tfrecords_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..3209ecb8c0b9f97b3bb4ec228b66a25c6929facc --- /dev/null +++ b/utils/tfrecords_utils.py @@ -0,0 +1,226 @@ +# ********** Info ********** +# @Creation: 2021-07-28 +# @Update: 2021-07-28 +# @Author: Michael Langguth +# @Site: Juelich supercomputing Centre (JSC) @ FZJ +# @File: tfrecords_utils.py +# ********** Info ********** + +import os +import glob +import subprocess as sp +import sys +import datetime as dt +import numpy as np +import xarray as xr +import pandas as pd +import json as js +import tensorflow as tf +from helper_utils import ensure_datetime, extract_date, subset_files_on_date + + +class IFS2TFRecords(object): + class_name = "IFS2TFRecords" + + date_fmt = "%Y-%m-%dT%H:%M" + + def __init__(self, tfr_dir: str, example_nc_file: str, create_tfr_dir: bool = True): + + method = "%{0}->{1}".format(IFS2TFRecords.class_name, IFS2TFRecords.__init__.__name__) + self.tfr_dir = tfr_dir + self.meta_data = os.path.join(self.tfr_dir, "metadata.json") + self.example_nc_data_file = example_nc_file + + if not os.path.isdir(self.tfr_dir): + if create_tfr_dir: + os.makedirs(tfr_dir) + print("%{0}: TFRecords directory has been created since it was not existing.".format(method)) + else: + raise NotADirectoryError("%{0}: TFRecords-directory does not exist.".format(method) + + "Either create it manually or set create_tfr_dir to True.") + + meta_dict = self.get_and_write_metadata() + self.variables = meta_dict["coordinates"]["variable"] + self.data_dim = (meta_dict["shape"]["nvars"], meta_dict["shape"]["nlat"], meta_dict["shape"]["nlon"]) + + def get_and_write_metadata(self): + + method = IFS2TFRecords.get_and_write_metadata.__name__ + + if not os.path.isfile(self.example_nc_data_file): + raise FileNotFoundError("%{0}: netCDF-file '{1} does not exist.".format(method, self.example_nc_data_file)) + + with xr.open_dataset(self.example_nc_data_file) as ds: + da = ds.to_array().squeeze(drop=True) + vars_nc = da["variable"].values + lat, lon = da["lat"].values, da["lon"].values + nlat, nlon, nvars = len(lat), len(lon), len(vars_nc) + + meta_dict = {"coordinates": {"variable": vars_nc.tolist(), + "lat": np.round(lat, decimals=2).tolist(), + "lon": np.round(lon, decimals=2).tolist()}, + "shape": {"nvars": nvars, "nlat": nlat, "nlon": nlon}} + + if not os.path.isfile(self.meta_data): + with open(self.meta_data, "w") as js_file: + js.dump(meta_dict, js_file) + + return meta_dict + + def get_data_from_file(self, fname): + + method = IFS2TFRecords.get_data_from_file.__name__ + + suffix_tfr = ".tfrecords" + tfr_file = os.path.join(self.tfr_dir, fname+".tfrecords" if fname.endswith(suffix_tfr) else fname) + + if not os.path.isfile(tfr_file): + raise FileNotFoundError("%{0}: TFRecord-file '{1}' does not exist.".format(method, tfr_file)) + + data = tf.data.TFRecordDataset(tfr_file) + + data = data.map(IFS2TFRecords.parse_one_data_arr) + + return data + + def write_monthly_data_to_tfr(self, dir_in, hour=None, patt="*.nc"): + """ + Use dates=pd.date_range(start_date, end_date, freq="M", normalize=True) + and then dates_red = dates[dates.quarter.isin([2,3])] for generating year_months + """ + + method = "%{0}->{1}".format(IFS2TFRecords.class_name, IFS2TFRecords.write_monthly_data_to_tfr.__name__) + + if not os.path.isdir(dir_in): + raise NotADirectoryError("%{0}: Passed directory '{1}' does not exist.".format(method, dir_in)) + + nc_files = sorted(glob.glob(os.path.join(dir_in, patt))) + + if not nc_files: + raise FileNotFoundError("%{0}: No netCDF-files found in '{1}'".format(method, dir_in)) + + if hour is None: + pass + else: + nc_files = subset_files_on_date(nc_files, int(hour)) + # create temprorary working directory to merge netCDF-files into a single one + #tmp_dir = os.path.join(dir_in, os.path.basename(dir_in) + "_subset") + #os.mkdir(tmp_dir) + #for nc_file in nc_files: + # os.symlink(nc_file, os.path.join(tmp_dir, os.path.basename(nc_file))) + + #dest_file = os.path.join(tmp_dir, "merged_data.nc") + #cmd = "cdo mergetime ${0}/*.nc ${1}".format(tmp_dir, dest_file) + + #_ = sp.check_output(cmd, shell=True) + + try: + with xr.open_mfdataset(nc_files) as dataset: + print("%{0}: Opening netCDF-files from directory '{1}'...".format(method, dir_in)) + data_arr_all = dataset.to_array() + data_arr_all = data_arr_all.transpose("time", "variable", ...) + except Exception as err: + print("%{0}: Failed to open netCDF-files from directory '{1}'.".format(method, dir_in)) + raise err + + dims2check = data_arr_all.isel(time=0).squeeze().shape + vars2check = list(data_arr_all["variable"].values) + assert dims2check == self.data_dim, \ + "%{0}: Shape of data from netCDF-file list {1} does not match expected shape {2}"\ + .format(method, dims2check, self.data_dim) + + assert vars2check == self.variables, "%{0} Unexpected set of varibales {1}".format(method, ",".join(vars2check)) + + IFS2TFRecords.write_dataset_to_tfr(data_arr_all, self.tfr_dir) + + @staticmethod + def write_dataset_to_tfr(data_arr: xr.DataArray, dirout:str): + + method = IFS2TFRecords.write_dataset_to_tfr.__name__ + + assert isinstance(data_arr, xr.DataArray), "%{0}: Input data must be a data array, but is of type {1}."\ + .format(method, type(data_arr)) + + assert os.path.isdir(dirout), "%{0}: Output directory '{1}' does not exist.".format(method, dirout) + + date_fmt = "%Y%m%d%H" + + try: + times = data_arr["time"] + ntimes = len(times) + date_start, date_end = ensure_datetime(times[0].values), ensure_datetime(times[-1].values) + tfr_file = os.path.join(dirout, "ifs_data_{0}_{1}.tfrecords".format(date_start.strftime(date_fmt), + date_end.strftime(date_fmt))) + + with tf.io.TFRecordWriter(tfr_file) as tfr_writer: + for itime in np.arange(ntimes): + out = IFS2TFRecords.parse_one_data_arr(data_arr.isel(time=itime)) + tfr_writer.write(out.SerializeToString()) + + print("%{0}: Wrote {1:d} elements to TFRecord-file '{2}'".format(method, ntimes, tfr_file)) + except Exception as err: + print("%{0}: Failed to write DataArray to TFRecord-file. See error below.".format(method)) + raise err + + @staticmethod + def parse_one_data_arr(data_arr): + + method = "%{0}->{1}".format(IFS2TFRecords.class_name, IFS2TFRecords.parse_one_data_arr.__name__) + + date_fmt = IFS2TFRecords.date_fmt + # sanity checks + if not isinstance(data_arr, xr.DataArray): + raise ValueError("%{0}: Input dataset must be a xarray Dataset, but is of type '{1}'" + .format(method, type(data_arr))) + + assert data_arr.ndim == 3, "%{0}: Data array must have rank 3, but is of rank {1:d}.".format(method, + data_arr.ndim) + dim_sh = data_arr.shape + + data_dict = {"nvars": IFS2TFRecords._int64_feature(dim_sh[0]), + "nlat": IFS2TFRecords._int64_feature(dim_sh[1]), + "nlon": IFS2TFRecords._int64_feature(dim_sh[2]), + "variable": IFS2TFRecords._bytes_list_feature(data_arr["variable"].values), + "time": IFS2TFRecords._bytes_feature(ensure_datetime(data_arr["time"][0].values) + .strftime(date_fmt)), + "data_array": IFS2TFRecords._bytes_feature(IFS2TFRecords.serialize_array(data_arr.values)) + } + + # create an Example with wrapped features + out = tf.train.Example(features=tf.train.Features(feature=data_dict)) + + return out + + # Methods to convert data to TF protocol buffer messages + @staticmethod + def serialize_array(array): + method = IFS2TFRecords.serialize_array.__name__ + + # method that should be used locally only + def map_np_dtype(arr, name="arr"): + dict_map = {name: tf.dtypes.as_dtype(arr[0].dtype)} + return dict_map + + try: + dtype_dict = map_np_dtype(array) + new = tf.io.serialize_tensor(array) + except Exception as err: + assert type(array) == np.array, "%{0}: Input data must be a numpy array.".format(method) + raise err + return new, dtype_dict + + @staticmethod + def _bytes_feature(value): + return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) + + @staticmethod + def _bytes_list_feature(values): + return tf.train.Feature(bytes_list=tf.train.BytesList(value=values)) + + @staticmethod + def _floats_feature(value): + return tf.train.Feature(float_list=tf.train.FloatList(value=value)) + + @staticmethod + def _int64_feature(value): + return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))