diff --git a/video_prediction_tools/data_preprocess/process_netCDF_v2.py b/video_prediction_tools/data_preprocess/process_netCDF_v2.py index 7e8370e97d717de3d794fbf78dcafb3ef6b97478..c102922b925bbf8a0bec5ded2f70bcb1e484fadb 100644 --- a/video_prediction_tools/data_preprocess/process_netCDF_v2.py +++ b/video_prediction_tools/data_preprocess/process_netCDF_v2.py @@ -1,4 +1,4 @@ -''' +'''i Code for processing staged ERA5 data, this is used for the DataPreprocessing stage of workflow ''' @@ -154,10 +154,10 @@ class PreprocessNcToPkl(): + - - + diff --git a/video_prediction_tools/main_scripts/main_preprocess_data_step1.py b/video_prediction_tools/main_scripts/main_preprocess_data_step1.py index 1d23ef4f6b92a6c979f615cca8b9821b18a4c908..cf00ab4e10a1b59c2e338fb929d68b7b1577bc56 100755 --- a/video_prediction_tools/main_scripts/main_preprocess_data_step1.py +++ b/video_prediction_tools/main_scripts/main_preprocess_data_step1.py @@ -4,15 +4,15 @@ import sys import subprocess import logging import time +import os +import argparse +import json from utils.external_function import directory_scanner from utils.external_function import load_distributor from utils.external_function import hash_directory from utils.external_function import md5 from data_preprocess.process_netCDF_v2 import * from metadata import MetaData as MetaData -import os -import argparse -import json def main(): @@ -21,7 +21,6 @@ def main(): parser.add_argument("--destination_dir", type=str, default="/p/scratch/deepacf/bing/processData_size_64_64_3_3t_norm") parser.add_argument("--script_dir","-scr_dir",dest="script_dir",type=str) parser.add_argument("--years", "-y", dest="years") - parser.add_argument("--checksum_status", type=int, default=0) parser.add_argument("--rsync_status", type=int, default=1) parser.add_argument("--vars", nargs="+",default = ["T2","T2","T2"]) #"MSL","gph500" parser.add_argument("--lat_s", type=int, default=74+32) @@ -37,7 +36,6 @@ def main(): source_dir = os.path.join(args.source_dir,str(years))+"/" destination_dir = args.destination_dir scr_dir = args.script_dir - checksum_status = args.checksum_status rsync_status = args.rsync_status vars = args.vars @@ -81,26 +79,8 @@ def main(): print('PyStager is Running .... ') # ================================== ALL Nodes: Read-in parameters ====================================== # - #Bing: using the args to configure the directories - # fileName = "parameters_process_netCDF.dat" # input parameters file - # fileObj = open(fileName) - # params = {} - # - # for line in fileObj: - # line = line.strip() - # read_in_value = line.split("=") - # if len(read_in_value) == 2: - # params[read_in_value[0].strip()] = read_in_value[1].strip() - # - # # input from the user: - # source_dir = str(params["Source_Directory"]) - # destination_dir = str(params["Destination_Directory"]) - # log_dir = str(params["Log_Directory"]) - # rsync_status = int(params["Rsync_Status"]) - # checksum_status = int(params["Checksum_Status"]) # check the existence of teh folders : - if not os.path.exists(source_dir): # check if the source dir. is existing if my_rank == 0: logging.critical('The source does not exist') @@ -109,7 +89,6 @@ def main(): sys.exit(1) - # ML 2020/04/26 # Expand destination_dir-variable by searching for netCDF-files in source_dir and processing the file from the first list element to obtain all relevant (meta-)data. if my_rank == 0: data_files_list = glob.glob(source_dir+"/**/*.nc",recursive=True) @@ -191,9 +170,6 @@ def main(): message_in = comm.recv() logging.info(message_in) message_counter = message_counter + 1 - #Bing - # ML 2020/05/19: Splitting now controlled from batch-script - # split_data(target_dir=destination_dir, partition = [0.6, 0.2, 0.2]) # stamp the end of the runtime end = time.time() @@ -220,33 +196,13 @@ def main(): #print(job) #grib_2_netcdf(rot_grid,source_dir, destination_dir, job) - - # creat a checksum ( hash) from the source folder. - if checksum_status == 1: - hash_directory(source_dir, job, current_path, "source") - if rsync_status == 1: - # prepare the rsync commoand to be excexuted by the worker node - #rsync_str = ("rsync -r " + source_dir + job + "/" + " " + destination_dir + "/" + job) - #os.system(rsync_str) - - #process_era5_in_dir(job, src_dir=source_dir, target_dir=destination_dir) - # ML 2020/06/09: workaround to get correct destination_dir obtained by the master node + + # ML 2020/06/09: workaround to get correct destination_dir obtained by the master node destination_dir = os.path.join(MetaData.get_destdir_jsontmp(tmp_dir=current_path),"pickle",years) - process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars) - - if checksum_status == 1: - hash_directory(destination_dir, job, current_path, "destination") - os.chdir(current_path) - source_hash_text = "source" + "_"+ job +"_hashed.txt" - destination_hash_text = "destination" + "_"+ job +"_hashed.txt" - if md5(source_hash_text) == md5(destination_hash_text): - msg_out = 'source: ' + job +' and destination: ' + job + ' files are identical' - print(msg_out) - - else: - msg_out = 'integrity of source: ' + job +' and destination: ' + job +' files could not be verified' - print(msg_out) + process_data = PreprocessNcToPkl(src_dir=src_dir,target_dir=destination_dir,job_name=job,slices=slices,vars=vars) + process_data() + #process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars) # Send : the finish of the sync message back to master node message_out = ('Node:', str(my_rank), 'finished :', "", '\r\n')