diff --git a/test/run_pytest.sh b/test/run_pytest.sh new file mode 100644 index 0000000000000000000000000000000000000000..795fac5269d7fc04866cce62fdade9840cc0e656 --- /dev/null +++ b/test/run_pytest.sh @@ -0,0 +1,21 @@ +#!#bin/bash +export PYTHONPATH=/p/project/deepacf/deeprain/langguth1/ambs/video_prediction_tools:$PYTHONPATH +# Name of virtual environment +VIRT_ENV_NAME="venv_juwels" + + +if [ -z ${VIRTUAL_ENV} ]; then + if [[ -f ../video_prediction_tools/${VIRT_ENV_NAME}/bin/activate ]]; then + echo "Activating virtual environment..." + source ../video_prediction_tools/${VIRT_ENV_NAME}/bin/activate + else + echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..." + return + fi +fi + + + +source ../video_prediction_tools/env_setup/modules_preprocess.sh +python -m pytest test_process_netCDF_v2.py + diff --git a/test/test_process_netCDF_v2.py b/test/test_process_netCDF_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..a7319d46bb5804f95820cbfd838360c6de17933c --- /dev/null +++ b/test/test_process_netCDF_v2.py @@ -0,0 +1,88 @@ + +#export PYTHONPATH=/p/project/deepacf/deeprain/bing/ambs/workflow_parallel_frame_prediction:$PYTHONPATH +from data_preprocess.process_netCDF_v2 import * +import pytest +import numpy as np +import json + +# some basic variables +src_dir_base = "/p/project/deepacf/deeprain/video_prediction_shared_folder/extractedData/test/" +target_dir_base = "/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/test/" +year = "2017" +slices = {"lat_s": 74, + "lat_e": 202, + "lon_s": 550, + "lon_e": 710 + } +job_name = "01" + +@pytest.fixture(scope="module") +def preprocessData_case1(src_dir=src_dir_base, target_dir=target_dir_base,\ + year=year,job_name=job_name,slices=slices): + return PreprocessNcToPkl(src_dir,target_dir,job_name,year,slices) + + +def test_directory_path(preprocessData_case1): + assert preprocessData_case1.directory_to_process == os.path.join(src_dir_base,str(year),job_name) + + +def test_get_image_list(preprocessData_case1): + #check the imageList is proper sorted + imageList = preprocessData_case1.get_images_list() + assert imageList[0] == "ecmwf_era5_17010100.nc" + assert imageList[-1] == "ecmwf_era5_17010323.nc" + + +def test_filter_not_match_pattern_files(preprocessData_case1): + #check if the files name match the patten and if the non-match ones are removed + imageList = preprocessData_case1.filter_not_match_pattern_files() + assert len(imageList) == len(preprocessData_case1.imageList_total) - 1 + + +def test_process_images_to_list_by_month(preprocessData_case1): + preprocessData_case1.initia_list_and_stat() + preprocessData_case1.process_images_to_list_by_month() + #Get the first elemnt of imageList, which is ecmwf_era5_17010100.nc and check if the variables values are equal to the first element of EU_list + im_path = os.path.join(src_dir_base,str(year),job_name,"ecmwf_era5_17010100.nc") + with Dataset(im_path,"r") as data_file: + times = data_file.variables["time"] + time = num2date(times[:],units=times.units,calendar=times.calendar) + temp = data_file.variables["T2"][0,slices["lat_s"]:slices["lat_e"], slices["lon_s"]:slices["lon_e"]] + #check the shape of EU_stack_list, len should be the same as the number of iamgeList, each element in the list should have the dimensions:[height, width, channels(inputs)] + assert np.array(preprocessData_case1.EU_stack_list[0]).shape == (-slices["lat_s"]+slices["lat_e"], -slices["lon_s"]+slices["lon_e"],preprocessData_case1.nvars) + #np.testing_assert_array_almost_equal(np.array(preprocessData_case1.EU_stack_list[0])[:,:,0],temp) + + +def test_save_stat_info(preprocessData_case1): + # statistic file to be tested + path_test_dir = os.path.join(target_dir_base,"pickle",str(year)) + fstat2test = os.path.join(path_test_dir,'stat_'+job_name+'.json') + # if statistic file is not present, create it + if not os.path.isfile(fstat2test): + preprocessData_case1() + + l_stat_exists = os.path.isfile(os.path.join(path_test_dir,'stat_'+job_name+'.json')) + l_pickle_exists= os.path.isfile(os.path.join(path_test_dir,'X_'+job_name+'.pkl')) and \ + os.path.isfile(os.path.join(path_test_dir,'T_'+job_name+'.pkl')) + + assert l_stat_exists == True + assert l_pickle_exists== True + + temp_list = np.array(preprocessData_case1.EU_stack_list)[:,:,:,0] + temp_mean = np.mean(temp_list) + temp_min = np.min(temp_list) + temp_max = np.max(temp_list) + msl_list = np.array(preprocessData_case1.EU_stack_list)[:,:,:,1] + msl_mean = np.mean(msl_list) + + with open(fstat2test) as json_file: + data = json.load(json_file) + assert data["T2"][0]["avg"] == pytest.approx(temp_mean,0.001) + assert data["T2"][0]["min"] == pytest.approx(temp_min,0.001) + assert data["T2"][0]["max"] == pytest.approx(temp_max,0.001) + assert data["MSL"][0]["avg"] == pytest.approx(msl_mean,0.001) + assert data["common_stat"][0]["nfiles"] == 70 + + #assert preprocessData_case1.save_stat_info.stat_obj["T2"]["min"] == temp_min + #assert preprocessData_case1.save_stat_info.stat_obj["T2"]["max"] == temp_max + diff --git a/video_prediction_tools/data_preprocess/process_netCDF_v2.py b/video_prediction_tools/data_preprocess/process_netCDF_v2.py index cf74a54f6ba1a8258bef004f8c50609816937d3f..90009e0b2a61a299e740ac7f5fb0c264e28c55ce 100644 --- a/video_prediction_tools/data_preprocess/process_netCDF_v2.py +++ b/video_prediction_tools/data_preprocess/process_netCDF_v2.py @@ -1,168 +1,166 @@ +'''i +Code for processing staged ERA5 data, this is used for the DataPreprocessing stage of workflow ''' -Code for processing staged ERA5 data -''' + +__email__ = "b.gong@fz-juelich.de" +__author__ = "Bing Gong, Scarlet Stadtler,Michael Langguth" import os import glob from netCDF4 import Dataset,num2date -from statistics import Calc_data_stat -#import requests -#from bs4 import BeautifulSoup -#import urllib.request import numpy as np -#from imageio import imread -#from scipy.misc import imresize import json import pickle +import fnmatch +from statistics import Calc_data_stat +import numpy as np +import json + +class PreprocessNcToPkl(): + + def __init__(self,src_dir=None,target_dir=None,job_name=None,year=None,slices=None,vars=("T2","MSL","gph500")): + ''' + Function that to process .nc file to pickle file + args: + src_dir : string, directory based on year where netCDF-files are stored to be processed + target_dir : base-directory where data is stored in general (however, pickle-files are stored under .../pickle/[year]/) + job_name : string "01"-"12" with, job_id passed and organized by PyStager, job_name also corresponds to the month + year : year of data to be processed + slices : dictionary e.g. {'lat_e': 202, 'lat_s': 74, 'lon_e': 710, 'lon_s': 550}, indices defining geographical region of interest + vars : variables to be processed + ''' + #directory_to_process is month-based directory + if int(job_name) >12 or int(job_name) < 1 or not isinstance(job_name,str): raise ValueError("job_name should be int type between 1 to 12") + self.directory_to_process=os.path.join(src_dir,str(year), str(job_name)) + if not os.path.exists(self.directory_to_process) : raise IOError("The directory_to_process '"+self.directory_to_process+"' does not exist") + self.target_dir = os.path.join(target_dir,"pickle",str(year)) # enforce that the preprocessed data is located under the pickle-subdirectory + if not os.path.exists(self.target_dir): os.mkdir(self.target_dir) + self.job_name = job_name + self.slices = slices + #target file name need to be saved + self.target_file = os.path.join(self.target_dir, 'X_' + str(self.job_name) + '.pkl') + self.vars = vars + + def __call__(self): + """ + Process the necCDF files in the month_base folder, store the variables of the images into list, store temporal information to list and save them to pickle file + """ + if os.path.exists(self.target_file): + print(self.target_file," file exists in the directory ", self.target_dir) + else: + print ("==========Processing files in directory {} =============== ".format(self.directory_to_process)) + self.get_images_list() + self.initia_list_and_stat() + self.process_images_to_list_by_month() + self.save_images_to_pickle_by_month() + self.save_stat_info() + self.save_temp_to_pickle_by_month() + + + def get_images_list(self): + """ + Get the images list from the directory_to_process and sort them by date names + """ + self.imageList_total = list(os.walk(self.directory_to_process, topdown = False))[-1][-1] + self.filter_not_match_pattern_files() + self.imageList = sorted(self.imageList) + return self.imageList + + def filter_not_match_pattern_files(self): + """ + filter the names of netcdf files with the patterns, if any file does not match the file pattern will removed from the imageList + for the pattern symbol: ^ match start at beginning of the string,[0-9] match a single character in the range 0-9; +matches one or more of the preceding character (greedy match); $ match start at end of the string + file example :ecmwf_era5_17010219.nc + """ + patt = "ecmwf_era5_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9].nc" + #self.imageList = self.imageList_total + #self.imageList = [fnmatch.fnmatch(n, patt) for n in self.imageList_total] + self.imageList = fnmatch.filter(self.imageList_total,patt) + return self.imageList + -# Create image datasets. -# Processes images and saves them in train, val, test splits. -def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2","MSL","gph500")): - ''' - :param directory_to_process: directory where netCDF-files are stored to be processed - :param target_dir: directory where pickle-files will e stored - :param job_name: job_id passed and organized by PyStager - :param slices: indices defining geographical region of interest - :param vars: variables to be processed - :return: Saves pickle-files which contain the sliced meteorological data and temporal information as well - ''' - desired_im_sz = (slices["lat_e"] - slices["lat_s"], slices["lon_e"] - slices["lon_s"]) - # ToDo: Define a convenient function to create a list containing all files. - imageList = list(os.walk(directory_to_process, topdown = False))[-1][-1] - imageList = sorted(imageList) - EU_stack_list = [0] * (len(imageList)) - temporal_list = [0] * (len(imageList)) - nvars = len(vars) - - # ML 2020/04/06 S - # Some inits - stat_obj = Calc_data_stat(nvars) - # ML 2020/04/06 E - for j, im_file in enumerate(imageList): - try: - im_path = os.path.join(directory_to_process, im_file) - print('Open following dataset: '+im_path) - vars_list = [] - with Dataset(im_path,'r') as data_file: - times = data_file.variables['time'] - time = num2date(times[:],units=times.units,calendar=times.calendar) - for i in range(nvars): - var1 = data_file.variables[vars[i]][0,slices["lat_s"]:slices["lat_e"], slices["lon_s"]:slices["lon_e"]] - stat_obj.acc_stat_loc(i,var1) - vars_list.append(var1) - - EU_stack = np.stack(vars_list, axis = 2) - EU_stack_list[j] =list(EU_stack) - - #20200408,bing - temporal_list[j] = list(time) - except Exception as err: - im_path = os.path.join(directory_to_process, im_file) - #im = Dataset(im_path, mode = 'r') - print("*************ERROR*************", err) - print("Error message {} from file {}".format(err,im_file)) - EU_stack_list[j] = list(EU_stack) # use the previous image as replacement, we can investigate further how to deal with the missing values - continue - - X = np.array(EU_stack_list) - # ML 2020/07/15: Make use of pickle-files only - target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.pkl') - with open(target_file, "wb") as data_file: - pickle.dump(X,data_file) - #target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.pkl') - #hkl.dump(X, target_file) #Not optimal! - print(target_file, "is saved") - # ML 2020/03/31: write json file with statistics - stat_obj.finalize_stat_loc(vars) - stat_obj.write_stat_json(target_dir,file_id=job_name) - # BG 2020/04/08: Also save temporal information to pickle-files - temporal_info = np.array(temporal_list) - temporal_file = os.path.join(target_dir, 'T_' + str(job_name) + '.pkl') - cwd = os.getcwd() - with open(temporal_file,"wb") as ftemp: - pickle.dump(temporal_info,ftemp) - #pickle.dump(temporal_info, open( temporal_file, "wb" ) ) - -def process_netCDF_in_dir(src_dir,**kwargs): - target_dir = kwargs.get("target_dir") - job_name = kwargs.get("job_name") - directory_to_process = os.path.join(src_dir, job_name) - os.chdir(directory_to_process) - if not os.path.exists(target_dir): os.mkdir(target_dir) - #target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.hkl') - # ML 2020/07/15: Make use of pickle-files only - target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.hkl') - if os.path.exists(target_file): - print(target_file," file exists in the directory ", target_dir) - else: - print ("==========Processing files in directory {} =============== ".format(directory_to_process)) - process_data(directory_to_process=directory_to_process, **kwargs) - - -# ML 2020/08/03 Not used anymore! -#def split_data_multiple_years(target_dir,partition,varnames): - #""" - #Collect all the X_*.hkl data across years and split them to training, val and testing datatset - #""" - ##target_dirs = [os.path.join(target_dir,year) for year in years] - ##os.chdir(target_dir) - #splits_dir = os.path.join(target_dir,"splits") - #os.makedirs(splits_dir, exist_ok=True) - #splits = {s: [] for s in list(partition.keys())} - ## ML 2020/05/19 S - #vars_uni, varsind, nvars = get_unique_vars(varnames) - #stat_obj = Calc_data_stat(nvars) - #for split in partition.keys(): - #values = partition[split] - #files = [] - #X = [] - #Temporal_X = [] - #for year in values.keys(): - #file_dir = os.path.join(target_dir,year) - #for month in values[year]: - #month = "{0:0=2d}".format(month) - #hickle_file = "X_{}.hkl".format(month) - ##20200408:bing - #temporal_file = "T_{}.pkl".format(month) - ##data_file = os.path.join(file_dir,hickle_file) - #data_file = os.path.join(file_dir,hickle_file) - #temporal_data_file = os.path.join(file_dir,temporal_file) - #files.append(data_file) - #data = hkl.load(data_file) - #with open(temporal_data_file,"rb") as ftemp: - #temporal_data = pickle.load(ftemp) - #X = X + list(data) - #Temporal_X = Temporal_X + list(temporal_data) - ## process stat-file: - #stat_obj.acc_stat_master(file_dir,int(month)) - #X = np.array(X) - #Temporal_X = np.array(Temporal_X) - #print("==================={}=====================".format(split)) - #print ("Sources for {} dataset are {}".format(split,files)) - #print("Number of images in {} dataset is {} ".format(split,len(X))) - #print ("dataset shape is {}".format(np.array(X).shape)) - ## ML 2020/07/15: Make use of pickle-files only - #with open(os.path.join(splits_dir , 'X_' + split + '.pkl'),"wb") as data_file: - #pickle.dump(X,data_file,protocol=4) - ##hkl.dump(X, os.path.join(splits_dir , 'X_' + split + '.hkl')) - - #with open(os.path.join(splits_dir,"T_"+split + ".pkl"),"wb") as temp_file: - #pickle.dump(Temporal_X, temp_file) - - #hkl.dump(files, os.path.join(splits_dir,'sources_' + split + '.hkl')) - - ## write final statistics json-file - #stat_obj.finalize_stat_master(target_dir,vars_uni) - #stat_obj.write_stat_json(splits_dir) + def initia_list_and_stat(self): + """ + Inits the empty list for store the images and tempral information, and intialise the + """ + self.EU_stack_list = [0] * (len(self.imageList)) + self.temporal_list = [0] * (len(self.imageList)) + self.nvars = len(self.vars) + self.stat_obj = Calc_data_stat(self.nvars) + + def process_images_to_list_by_month(self): + """ + Get the selected variables from netCDF file, and concanate all the variables from all the images in the directiory_to_process into a list EU_stack_list + EU_stack_list dimension should be [numer_of_images,height, width,number_of_variables] + temporal_list is 1-dim list with timestamp data type, contains all the timestamps of netCDF files. + """ + counter = 0 + for j, im_file in enumerate(self.imageList): + try: + im_path = os.path.join(self.directory_to_process, im_file) + vars_list = [] + with Dataset(im_path,'r') as data_file: + times = data_file.variables['time'] + time = num2date(times[:],units=times.units,calendar=times.calendar) + for i in range(self.nvars): + var1 = data_file.variables[self.vars[i]][0,self.slices["lat_s"]:self.slices["lat_e"], self.slices["lon_s"]:self.slices["lon_e"]] + self.stat_obj.acc_stat_loc(i,var1) + vars_list.append(var1) + EU_stack = np.stack(vars_list, axis=2) + self.EU_stack_list[j] =list(EU_stack) + self.temporal_list[j] = list(time) + print('Open following dataset: '+im_path + "was successfully processed") + except Exception as err: + #if the error occurs at the first nc file, we will skip it + if counter == 0: + print("Counter:",counter) + else: + im_path = os.path.join(self.directory_to_process, im_file) + print("*************ERROR*************", err) + print("Error message {} from file {}".format(err,im_file)) + self.EU_stack_list[j] = list(EU_stack) # use the previous image as replacement, we can investigate further how to deal with the missing values + counter += 1 + continue + + + + def save_images_to_pickle_by_month(self): + """ + save list of variables from all the images to pickle file + """ + X = np.array(self.EU_stack_list) + target_file = os.path.join(self.target_dir, 'X_' + str(self.job_name) + '.pkl') + with open(target_file, "wb") as data_file: + pickle.dump(X,data_file) + return True + + + + def save_temp_to_pickle_by_month(self): + """ + save the temporal information to pickle file + """ + temporal_info = np.array(self.temporal_list) + temporal_file = os.path.join(self.target_dir, 'T_' + str(self.job_name) + '.pkl') + with open(temporal_file,"wb") as ftemp: + pickle.dump(temporal_info,ftemp) + + def save_stat_info(self): + """ + save the stat information to the target dir + """ + self.stat_obj.finalize_stat_loc(self.vars) + self.stat_obj.write_stat_json(self.target_dir,file_id=self.job_name) + + - - + diff --git a/video_prediction_tools/main_scripts/main_preprocess_data_step1.py b/video_prediction_tools/main_scripts/main_preprocess_data_step1.py index 1d23ef4f6b92a6c979f615cca8b9821b18a4c908..f6738692c9bba114406d07faef5923e038ac70de 100755 --- a/video_prediction_tools/main_scripts/main_preprocess_data_step1.py +++ b/video_prediction_tools/main_scripts/main_preprocess_data_step1.py @@ -4,15 +4,15 @@ import sys import subprocess import logging import time +import os +import argparse +import json from utils.external_function import directory_scanner from utils.external_function import load_distributor from utils.external_function import hash_directory from utils.external_function import md5 from data_preprocess.process_netCDF_v2 import * from metadata import MetaData as MetaData -import os -import argparse -import json def main(): @@ -21,23 +21,22 @@ def main(): parser.add_argument("--destination_dir", type=str, default="/p/scratch/deepacf/bing/processData_size_64_64_3_3t_norm") parser.add_argument("--script_dir","-scr_dir",dest="script_dir",type=str) parser.add_argument("--years", "-y", dest="years") - parser.add_argument("--checksum_status", type=int, default=0) parser.add_argument("--rsync_status", type=int, default=1) parser.add_argument("--vars", nargs="+",default = ["T2","T2","T2"]) #"MSL","gph500" - parser.add_argument("--lat_s", type=int, default=74+32) - parser.add_argument("--lat_e", type=int, default=202-32) - parser.add_argument("--lon_s", type=int, default=550+16+32) - parser.add_argument("--lon_e", type=int, default=710-16-32) + parser.add_argument("--lat_s", type=int, default=106) + parser.add_argument("--lat_e", type=int, default=170) + parser.add_argument("--lon_s", type=int, default=598) + parser.add_argument("--lon_e", type=int, default=662) parser.add_argument("--experimental_id","-exp_id",dest="exp_id",type=str, default="exp1",\ help="Experimental identifier helping to distinguish between different experiments.") args = parser.parse_args() current_path = os.getcwd() years = args.years - source_dir = os.path.join(args.source_dir,str(years))+"/" + source_dir = args.source_dir + source_dir_full = os.path.join(source_dir,str(years))+"/" destination_dir = args.destination_dir scr_dir = args.script_dir - checksum_status = args.checksum_status rsync_status = args.rsync_status vars = args.vars @@ -81,27 +80,9 @@ def main(): print('PyStager is Running .... ') # ================================== ALL Nodes: Read-in parameters ====================================== # - #Bing: using the args to configure the directories - # fileName = "parameters_process_netCDF.dat" # input parameters file - # fileObj = open(fileName) - # params = {} - # - # for line in fileObj: - # line = line.strip() - # read_in_value = line.split("=") - # if len(read_in_value) == 2: - # params[read_in_value[0].strip()] = read_in_value[1].strip() - # - # # input from the user: - # source_dir = str(params["Source_Directory"]) - # destination_dir = str(params["Destination_Directory"]) - # log_dir = str(params["Log_Directory"]) - # rsync_status = int(params["Rsync_Status"]) - # checksum_status = int(params["Checksum_Status"]) # check the existence of teh folders : - - if not os.path.exists(source_dir): # check if the source dir. is existing + if not os.path.exists(source_dir_full): # check if the source dir. is existing if my_rank == 0: logging.critical('The source does not exist') logging.info('exit status : 1') @@ -109,11 +90,10 @@ def main(): sys.exit(1) - # ML 2020/04/26 # Expand destination_dir-variable by searching for netCDF-files in source_dir and processing the file from the first list element to obtain all relevant (meta-)data. if my_rank == 0: - data_files_list = glob.glob(source_dir+"/**/*.nc",recursive=True) - if not data_files_list: raise ValueError("Could not find any data to be processed in '"+source_dir+"'") + data_files_list = glob.glob(source_dir_full+"/**/*.nc",recursive=True) + if not data_files_list: raise IOError("Could not find any data to be processed in '"+source_dir_full+"'") md = MetaData(suffix_indir=destination_dir,exp_id=exp_id,data_filename=data_files_list[0],slices=slices,variables=vars) # modify Batch scripts if metadata has been retrieved for the first time (md.status = "new") @@ -153,7 +133,7 @@ def main(): print(" # ============== Directory scanner : start ==================# ") - ret_dir_scanner = directory_scanner(source_dir) + ret_dir_scanner = directory_scanner(source_dir_full) print(ret_dir_scanner) dir_detail_list = ret_dir_scanner[0] sub_dir_list = ret_dir_scanner[1] @@ -191,9 +171,6 @@ def main(): message_in = comm.recv() logging.info(message_in) message_counter = message_counter + 1 - #Bing - # ML 2020/05/19: Splitting now controlled from batch-script - # split_data(target_dir=destination_dir, partition = [0.6, 0.2, 0.2]) # stamp the end of the runtime end = time.time() @@ -220,33 +197,13 @@ def main(): #print(job) #grib_2_netcdf(rot_grid,source_dir, destination_dir, job) - - # creat a checksum ( hash) from the source folder. - if checksum_status == 1: - hash_directory(source_dir, job, current_path, "source") - if rsync_status == 1: - # prepare the rsync commoand to be excexuted by the worker node - #rsync_str = ("rsync -r " + source_dir + job + "/" + " " + destination_dir + "/" + job) - #os.system(rsync_str) - - #process_era5_in_dir(job, src_dir=source_dir, target_dir=destination_dir) - # ML 2020/06/09: workaround to get correct destination_dir obtained by the master node - destination_dir = os.path.join(MetaData.get_destdir_jsontmp(tmp_dir=current_path),"pickle",years) - process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars) - - if checksum_status == 1: - hash_directory(destination_dir, job, current_path, "destination") - os.chdir(current_path) - source_hash_text = "source" + "_"+ job +"_hashed.txt" - destination_hash_text = "destination" + "_"+ job +"_hashed.txt" - if md5(source_hash_text) == md5(destination_hash_text): - msg_out = 'source: ' + job +' and destination: ' + job + ' files are identical' - print(msg_out) - - else: - msg_out = 'integrity of source: ' + job +' and destination: ' + job +' files could not be verified' - print(msg_out) + + # ML 2020/06/09: workaround to get correct destination_dir obtained by the master node + destination_dir = MetaData.get_destdir_jsontmp(tmp_dir=current_path) + process_data = PreprocessNcToPkl(src_dir=source_dir,target_dir=destination_dir,year=years,job_name=job,slices=slices,vars=vars) + process_data() + #process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars) # Send : the finish of the sync message back to master node message_out = ('Node:', str(my_rank), 'finished :', "", '\r\n') @@ -259,4 +216,3 @@ if __name__ == "__main__": main() -