Skip to content
Snippets Groups Projects
Commit 03c36688 authored by gong1's avatar gong1
Browse files

remove checksum in pystager and replace the process_netcdf function with the...

remove checksum in pystager and replace the process_netcdf function with the update new one from data_preprocess.process_netCDF_v2
parent 502a1504
No related branches found
No related tags found
No related merge requests found
Checking pipeline status
'''
'''i
Code for processing staged ERA5 data, this is used for the DataPreprocessing stage of workflow
'''
......
......@@ -4,15 +4,15 @@ import sys
import subprocess
import logging
import time
import os
import argparse
import json
from utils.external_function import directory_scanner
from utils.external_function import load_distributor
from utils.external_function import hash_directory
from utils.external_function import md5
from data_preprocess.process_netCDF_v2 import *
from metadata import MetaData as MetaData
import os
import argparse
import json
def main():
......@@ -21,7 +21,6 @@ def main():
parser.add_argument("--destination_dir", type=str, default="/p/scratch/deepacf/bing/processData_size_64_64_3_3t_norm")
parser.add_argument("--script_dir","-scr_dir",dest="script_dir",type=str)
parser.add_argument("--years", "-y", dest="years")
parser.add_argument("--checksum_status", type=int, default=0)
parser.add_argument("--rsync_status", type=int, default=1)
parser.add_argument("--vars", nargs="+",default = ["T2","T2","T2"]) #"MSL","gph500"
parser.add_argument("--lat_s", type=int, default=74+32)
......@@ -37,7 +36,6 @@ def main():
source_dir = os.path.join(args.source_dir,str(years))+"/"
destination_dir = args.destination_dir
scr_dir = args.script_dir
checksum_status = args.checksum_status
rsync_status = args.rsync_status
vars = args.vars
......@@ -81,26 +79,8 @@ def main():
print('PyStager is Running .... ')
# ================================== ALL Nodes: Read-in parameters ====================================== #
#Bing: using the args to configure the directories
# fileName = "parameters_process_netCDF.dat" # input parameters file
# fileObj = open(fileName)
# params = {}
#
# for line in fileObj:
# line = line.strip()
# read_in_value = line.split("=")
# if len(read_in_value) == 2:
# params[read_in_value[0].strip()] = read_in_value[1].strip()
#
# # input from the user:
# source_dir = str(params["Source_Directory"])
# destination_dir = str(params["Destination_Directory"])
# log_dir = str(params["Log_Directory"])
# rsync_status = int(params["Rsync_Status"])
# checksum_status = int(params["Checksum_Status"])
# check the existence of teh folders :
if not os.path.exists(source_dir): # check if the source dir. is existing
if my_rank == 0:
logging.critical('The source does not exist')
......@@ -109,7 +89,6 @@ def main():
sys.exit(1)
# ML 2020/04/26
# Expand destination_dir-variable by searching for netCDF-files in source_dir and processing the file from the first list element to obtain all relevant (meta-)data.
if my_rank == 0:
data_files_list = glob.glob(source_dir+"/**/*.nc",recursive=True)
......@@ -191,9 +170,6 @@ def main():
message_in = comm.recv()
logging.info(message_in)
message_counter = message_counter + 1
#Bing
# ML 2020/05/19: Splitting now controlled from batch-script
# split_data(target_dir=destination_dir, partition = [0.6, 0.2, 0.2])
# stamp the end of the runtime
end = time.time()
......@@ -220,33 +196,13 @@ def main():
#print(job)
#grib_2_netcdf(rot_grid,source_dir, destination_dir, job)
# creat a checksum ( hash) from the source folder.
if checksum_status == 1:
hash_directory(source_dir, job, current_path, "source")
if rsync_status == 1:
# prepare the rsync commoand to be excexuted by the worker node
#rsync_str = ("rsync -r " + source_dir + job + "/" + " " + destination_dir + "/" + job)
#os.system(rsync_str)
#process_era5_in_dir(job, src_dir=source_dir, target_dir=destination_dir)
# ML 2020/06/09: workaround to get correct destination_dir obtained by the master node
destination_dir = os.path.join(MetaData.get_destdir_jsontmp(tmp_dir=current_path),"pickle",years)
process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars)
if checksum_status == 1:
hash_directory(destination_dir, job, current_path, "destination")
os.chdir(current_path)
source_hash_text = "source" + "_"+ job +"_hashed.txt"
destination_hash_text = "destination" + "_"+ job +"_hashed.txt"
if md5(source_hash_text) == md5(destination_hash_text):
msg_out = 'source: ' + job +' and destination: ' + job + ' files are identical'
print(msg_out)
else:
msg_out = 'integrity of source: ' + job +' and destination: ' + job +' files could not be verified'
print(msg_out)
process_data = PreprocessNcToPkl(src_dir=src_dir,target_dir=destination_dir,job_name=job,slices=slices,vars=vars)
process_data()
#process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars)
# Send : the finish of the sync message back to master node
message_out = ('Node:', str(my_rank), 'finished :', "", '\r\n')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment