diff --git a/DataPreprocess/mpi_split_data_multi_years.py b/DataPreprocess/mpi_split_data_multi_years.py index a4e92ea8c65214cde86ac53c30fd79359162968d..363aa19986db0821193be3aa8d1e31065b6b13fd 100644 --- a/DataPreprocess/mpi_split_data_multi_years.py +++ b/DataPreprocess/mpi_split_data_multi_years.py @@ -5,12 +5,15 @@ import json #add parser arguments parser = argparse.ArgumentParser() +#parser.add_argument("--source_dir", type=str, default="/p/scratch/deepacf/bing/extractedData/") parser.add_argument("--destination_dir","-dest",dest="destination_dir",type=str, default="/p/scratch/deepacf/bing/processData_size_64_64_3_3t_norm") +parser.add_argument("--varnames","-vars",dest="varnames", nargs = '+') #parser.add_argument("--partition","-part",dest="partition",type=json.loads) # help="--partition allows to control the splitting of the processed data in training, test and validation data. Pass a dictionary-like string.") -#parser.add_argument("--partition", type=str, default="") + args = parser.parse_args() target_dir = args.destination_dir +varnames = args.varnames #partition = args.partition #all_keys = partition.keys() @@ -20,7 +23,7 @@ target_dir = args.destination_dir partition = { "train":{ "2017":[1] - }, + }, "val": {"2017":[2] }, @@ -33,6 +36,6 @@ comm = MPI.COMM_WORLD my_rank = comm.Get_rank() # rank of the node p = comm.Get_size() # number of assigned nods if my_rank == 0: # node is master - split_data_multiple_years(target_dir=target_dir,partition=partition) + split_data_multiple_years(target_dir=target_dir,partition=partition,varnames=varnames) else: pass diff --git a/DataPreprocess/mpi_stager_v2_process_netCDF.py b/DataPreprocess/mpi_stager_v2_process_netCDF.py index 01a98227225bc9cff3ede7c21382ac1711a9facd..7f7bed47f2c997b541dd58ac6d5ba90154bffda8 100755 --- a/DataPreprocess/mpi_stager_v2_process_netCDF.py +++ b/DataPreprocess/mpi_stager_v2_process_netCDF.py @@ -105,7 +105,6 @@ def main(): logging.critical('The Destination does not exist') logging.info('Create new destination dir') os.makedirs(destination_dir,exist_ok=True) - #if not os.path.exists(destination_dir): os.mkdir(destination_dir) if my_rank == 0: # node is master: # ==================================== Master : Directory scanner ================================= # @@ -151,10 +150,8 @@ def main(): logging.info(message_in) message_counter = message_counter + 1 #Bing - split_data(target_dir=destination_dir, partition = [0.6, 0.2, 0.2]) - # ML 2020/04/06 S - create_stat_json_master(destination_dir,p-idle_counter,vars) - # ML 2020/04/06 E + # ML 2020/05/19: Splitting now controlled from batch-script + # split_data(target_dir=destination_dir, partition = [0.6, 0.2, 0.2]) # stamp the end of the runtime end = time.time() diff --git a/DataPreprocess/process_netCDF_v2.py b/DataPreprocess/process_netCDF_v2.py index 798be6e5660b75ca87fe7f475eb859a3c63fdd41..21bacb746a1c95a576dabd3e548d5f5a00dafdb0 100644 --- a/DataPreprocess/process_netCDF_v2.py +++ b/DataPreprocess/process_netCDF_v2.py @@ -15,8 +15,6 @@ import hickle as hkl import json import pickle - - # Create image datasets. # Processes images and saves them in train, val, test splits. def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2","MSL","gph500")): @@ -26,7 +24,7 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", imageList = sorted(imageList) EU_stack_list = [0] * (len(imageList)) temporal_list = [0] * (len(imageList)) - len_vars = len(vars) + nvars = len(vars) #X = np.zeros((len(splits[split]),) + desired_im_sz + (3,), np.uint8) #print(X) #print('shape of X' + str(X.shape)) @@ -36,8 +34,7 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", ##### Save everything after for loop. # ML 2020/04/06 S # Some inits - varmin, varmax = np.full(len_vars,np.nan), np.full(len_vars,np.nan) - varavg = np.zeros(len_vars) + stat_obj = Calc_data_stat(nvars) # ML 2020/04/06 E for j, im_file in enumerate(imageList): #20200408,Bing @@ -52,47 +49,15 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", times = im.variables['time'] time = num2date(times[:],units=times.units,calendar=times.calendar) vars_list = [] - for i in range(len_vars): - im = Dataset(im_path, mode = 'r') - var1 = im.variables[vars[i]][0, :, :] - im.close() - var1 = var1[slices["lat_s"]:slices["lat_e"], slices["lon_s"]:slices["lon_e"]] - vars_list.append(var1) - # ML 2020/03/31: apply some statistics - varmin[i], varmax[i] = np.fmin(varmin[i],np.amin(var1)), np.fmax(varmax[i],np.amax(var1)) - varavg[i] += np.average(var1) - # var2 = var2[slices["lat_e"]-slices["lat_s"],slices["lon_e"]-slices["lon_s"]] - # var3 = var3[slices["lat_e"]-slices["lat_s"],slices["lon_e"]-slices["lon_s"]] - #print(EU_t2.shape, EU_msl.shape, EU_gph500.shape) - #Normal stack: T2, MSL & GPH500 - #EU_stack = np.stack([EU_t2, EU_msl, EU_gph500],axis=2) - #Stack T2 only: - #EU_stack = np.stack([EU_t2, EU_t2, EU_t2],axis=2) - #EU_stack_list[i]=EU_stack - #Stack T2*2 MSL*1: - #EU_stack = np.stack([EU_t2, EU_t2, EU_msl],axis=2) - #EU_stack_list[i]=EU_stack - #EU_stack = np.stack([EU_t2, EU_msl, EU_msl],axis=2) - #EU_stack_list[i]=EU_stack - #Stack T2*2 gph500*1: - #EU_stack = np.stack([EU_t2, EU_t2, EU_gph500],axis=2) - #EU_stack_list[i]=EU_stack - #Stack T2*1 gph500*2 - #EU_stack = np.stack([EU_t2, EU_gph500, EU_gph500],axis=2) - #EU_stack_list[i]=EU_stack - #print(EU_stack.shape) - #X[i]=EU_stack #this should be unnecessary - #t2_1 stack. Stack t2 with two empty arrays - #empty_image = np.zeros(shape = (128, 160)) - #EU_stack = np.stack([EU_t2, empty_image, empty_image],axis=2) - #EU_stack_list[i]=EU_stack - #t2_2 stack. Stack t2 with one empty array -# empty_image = np.zeros(shape = (64, 64)) - #EU_stack = np.stack([var1, var2, var3], axis=2) + with Dataset(im_path,'r') as data_file: + for i in range(nvars): + var1 = data_file.variables[vars[i]][0,slices["lat_s"]:slices["lat_e"], slices["lon_s"]:slices["lon_e"]] + stat_obj.acc_stat_loc(i,var1) + vars_list.append(var1) + EU_stack = np.stack(vars_list, axis = 2) EU_stack_list[j] =list(EU_stack) - #20200408,bing temporal_list[j] = list(time) #print('Does ist work? ') @@ -111,28 +76,9 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", hkl.dump(X, target_file) #Not optimal! print(target_file, "is saved") # ML 2020/03/31: write json file with statistics - vars_uni, varsind = np.unique(vars,return_index=True) - nvars = len(vars_uni) - - varmin, varmax, varavg = varmin[varsind], varmax[varsind], varavg[varsind] - stat_dict = {} - for i in range(nvars): - varavg[i] /= len(imageList) - print('varavg['+str(i)+'] : {0:5.2f}'.format(varavg[i])) - print('length of imageList: ',len(imageList)) - - stat_dict[vars_uni[i]]=[] - stat_dict[vars_uni[i]].append({ - 'min': varmin[i], - 'max': varmax[i], - 'avg': varavg[i] - }) - - js_file = os.path.join(target_dir,'stat_' + str(job_name) + '.json') - with open(js_file,'w') as stat_out: - json.dump(stat_dict, stat_out) - print(js_file+" was created successfully...") - #20200408:bing + stat_obj.finalize_stat_loc(vars) + stat_obj.write_stat_json(target_dir,file_id=job_name) + # BG 2020/04/08: Also save temporal information to pickle-files temporal_info = np.array(temporal_list) temporal_file = os.path.join(target_dir, 'T_' + str(job_name) + '.pkl') cwd = os.getcwd() @@ -234,79 +180,244 @@ def split_data(target_dir, partition= [0.6, 0.2, 0.2]): pickle.dump(X_temporal,open(os.path.join(split_dir,"T_"+split + ".pkl"),"wb")) print ("PICKLE FILE FOR SPLITS SAVED") -# ML 2020/04/03 S -def get_stat(stat_dict,stat_name): - ''' - Unpacks statistics dictionary and returns values of stat_name - ''' - - try: - return [stat_dict[i][0][stat_name] for i in [*stat_dict.keys()]] - except: - raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") - -# ML 2020/04/13 S -def get_stat_allvars(stat_dict,stat_name,allvars): - ''' - Retrieves requested statistics (stat_name) for all variables listed in allvars given statistics dictionary. - ''' - vars_uni,indrev = np.unique(allvars,return_inverse=True) +# ML 2020/05/15 S +def get_unique_vars(varnames): + vars_uni, varsind = np.unique(varnames,return_index = True) + nvars_uni = len(vars_uni) - try: - return([stat_dict[var][0][stat_name] for var in vars_uni[indrev]]) - except: - raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") - -# ML 2020/04/13: E + return(vars_uni, varsind, nvars_uni) -def create_stat_json_master(target_dir,nnodes_active,vars): - ''' - Reads all json-files created by slave nodes in 'process_data'-function (see above), - computes final statistics and writes them in final file to be used in subsequent steps. - ''' - - - all_stat_files = glob.glob(target_dir+"/**/stat_*.json",recursive=True) - - - nfiles = len(all_stat_files) - if (nfiles < nnodes_active): - raise ValueError("Found less files than expected by number of active slave nodes!") - - - - vars_uni = np.unique(vars) - nvars = len(vars_uni) +class Calc_data_stat: + """Class for computing statistics and saving them to a json-files.""" + + def __init__(self,nvars): + """ + Initializes the instance for later use, i.e. initializes attributes with expected shape + """ + self.stat_dict = {} + self.varmin = np.full((nvars,1),np.nan) # avoid rank one-arrays + self.varmax = np.full((nvars,1),np.nan) + self.varavg = np.zeros((nvars,1)) # second dimension acts as placeholder for averaging on master node collecting json-files from slave nodes + self.nfiles = [0] # number of processed files + self.mode = "" # mode to distinguish between processing on slave and master nodes (sanity check) + self.jsfiles = [""] # list of processed json-files (master-mode only!) + + def acc_stat_loc(self,ivar,data): + """ + Performs accumulation of all statistics while looping through all data files (i.e. updates the statistics) on slave nodes + """ + if not self.mode: + self.mode = "loc" + elif self.mode == "master": + raise ValueError("Cannot switch to loc-mode during runtime...") + else: + pass + + self.varmin[ivar] = np.fmin(self.varmin[ivar],np.amin(data)) + self.varmax[ivar] = np.fmax(self.varmax[ivar],np.amax(data)) + self.varavg[ivar,0] += np.average(data) # note that we sum the average -> readjustment required in the final step + if (ivar == 0): self.nfiles[0] += 1 + + def finalize_stat_loc(self,varnames): + """ + Finalizes computation of statistics after going through all the data on slave nodes. + Afterwards the statistics dictionary is ready for being written in a json-file. + """ + + if self.mode != "loc": + raise ValueError("Object is not in loc-mode. Probably some master-method has been called previously.") + + if self.stat_dict: raise ValueError("Statistics dictionary is not empty.") + + vars_uni, varsind = np.unique(varnames,return_index=True) + nvars = len(vars_uni) + + vars_uni, varsind, nvars = get_unique_vars(varnames) - varmin, varmax = np.full(nvars,np.nan), np.full(nvars,np.nan) # initializes with NaNs -> make use of np.fmin/np.fmax subsequently - varavg = np.zeros(nvars) + varmin, varmax, varavg = self.varmin[varsind], self.varmax[varsind], self.varavg[varsind,0] + + for i in range(nvars): + varavg[i] /= self.nfiles # for adjusting the (summed) average + + self.stat_dict[vars_uni[i]]=[] + self.stat_dict[vars_uni[i]].append({ + 'min': varmin[i,0].tolist(), + 'max': varmax[i,0].tolist(), + 'avg': varavg[i].tolist() + }) + self.stat_dict["common_stat"] = [ + {"nfiles":self.nfiles[0]}] + + def acc_stat_master(self,file_dir,file_id): + """ + Opens statistics-file (created by slave nodes) and accumulates its content. + """ + + if (int(file_id) <= 0): raise ValueError("Non-valid file_id passed.") + + if not self.mode: + self.mode = "master" + elif self.mode == "loc": + raise ValueError("Cannot switch to master-mode during runtime...") + else: + pass + + # sanity check: check if dictionary is initialized with unique values only + if self.stat_dict.keys() > set(self.stat_dict.keys()): + raise ValueError("Initialized dictionary contains duplicates of variales. Need unique collection instead.") + else: + pass - for ff in range(nfiles): - with open(all_stat_files[ff]) as js_file: - data = json.load(js_file) - - varmin, varmax = np.fmin(varmin,get_stat(data,"min")), np.fmax(varmax,get_stat(data,"max")) - varavg += get_stat(data,"avg") + file_name = os.path.join(file_dir,"stat_{0:0=2d}.json".format(int(file_id))) + + if not file_name in self.jsfiles: + print("Try to open: '"+file_name+"'") - # write final statistics - stat_dict = {} - for i in range(nvars): - stat_dict[vars_uni[i]]=[] - stat_dict[vars_uni[i]].append({ - 'min': varmin[i], - 'max': varmax[i], - 'avg': varavg[i]/nfiles - - }) - - js_file = os.path.join(target_dir+"/splits",'statistics.json') - with open(js_file,'w') as stat_out: - json.dump(stat_dict, stat_out) - print(js_file+" was created successfully...") + try: + with open(file_name) as js_file: + dict_in = json.load(js_file) + + # sanity check + if (len(dict_in.keys()) -1 != len(self.varmin)): + raise ValueError("Different number of variables found in json-file '"+js_file+"' as expected from statistics object.") + + self.varmin = np.fmin(self.varmin,Calc_data_stat.get_stat_allvars(dict_in,"min")) + self.varmax = np.fmax(self.varmax,Calc_data_stat.get_stat_allvars(dict_in,"max")) + + if (np.all(self.varavg == 0.) or self.nfiles[0] == 0): + self.varavg = Calc_data_stat.get_stat_allvars(dict_in,"avg") + self.nfiles[0] = Calc_data_stat.get_common_stat(dict_in,"nfiles") + self.jsfiles[0]= file_name + else: + self.varavg = np.append(self.varavg,Calc_data_stat.get_stat_allvars(dict_in,"avg"),axis=1) + self.nfiles.append(Calc_data_stat.get_common_stat(dict_in,"nfiles")) + self.jsfiles.append(file_name) + except IOError: + print("Cannot handle statistics file '"+file_name+"' to be processed.") + except ValueError: + print("Cannot retireve all required statistics from '"+file_name+"'") + else: + print("Statistics file '"+file_name+"' has already been processed. Thus, just pass here...") + pass - + def finalize_stat_master(self,path_out,vars_uni): + """ + Performs final compuattion of statistics after accumulation from slave nodes. + """ + if self.mode != "master": + raise ValueError("Object is not in master-mode. Probably some loc-method has been called previously.") + + if len(vars_uni) > len(set(vars_uni)): + raise ValueError("Input variable names are not unique.") + + js_file = os.path.join(path_out,"statistics.json") + nvars = len(vars_uni) + n_jsfiles = len(self.nfiles) + nfiles_all= np.sum(self.nfiles) + avg_wgt = np.array(self.nfiles,dtype=float)/float(nfiles_all) + + varmin, varmax = self.varmin, self.varmax + varavg = np.sum(np.multiply(self.varavg,avg_wgt),axis=1) # calculate weighted average + + for i in range(nvars): + self.stat_dict[vars_uni[i]]=[] + self.stat_dict[vars_uni[i]].append({ + 'min': varmin[i,0].tolist(), + 'max': varmax[i,0].tolist(), + 'avg': varavg[i].tolist() + }) + self.stat_dict["common_stat"] = [ + {"nfiles": int(nfiles_all), + "jsfiles": self.jsfiles + }] + + @staticmethod + def get_stat_allvars(stat_dict,stat_name): + """ + Unpacks statistics dictionary and returns values of stat_name of all variables contained in the dictionary. + """ + + # some sanity checks + if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + stat_dict_filter = (stat_dict).copy() + stat_dict_filter.pop("common_stat") + + if not stat_dict_filter.keys(): raise ValueError("Input dictionary does not contain any variables.") + + try: + varstat = np.array([stat_dict_filter[i][0][stat_name] for i in [*stat_dict_filter.keys()]]) + if np.ndim(varstat) == 1: # avoid returning rank 1-arrays + return varstat.reshape(-1,1) + else: + return varstat + except: + raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") + + @staticmethod + def get_stat_vars(stat_dict,stat_name,vars_in): + """ + Retrieves requested statistics (stat_name) for all unique variables listed in allvars given statistics dictionary. + If more than one unique variable is processed, this method returns a list, whereas a scalar is returned else. + """ + + if not stat_dict: raise ValueError("Statistics dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + vars_uni,indrev = np.unique(vars_in,return_inverse=True) + + try: + if len(vars_uni) > 1: + return([stat_dict[var][0][stat_name] for var in vars_uni[indrev]]) + else: + return(stat_dict[vars_uni[0]][0][stat_name]) + except: + raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") + + @staticmethod + def get_common_stat(stat_dict,stat_name): + + if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + common_stat_dict = stat_dict["common_stat"][0] + + try: + return(common_stat_dict[stat_name]) + except: + raise ValueError("Could not find "+stat_name+" in common_stat of input dictionary.") + + + def write_stat_json(self,path_out,file_id = -1): + """ + Writes statistics-dictionary of slave nodes to json-file (with job_id in the output name) + If file_id is passed (and greater than 0), parallelized peration on a slave node is assumed. + Else: method is invoked from master node, i.e. final json-file is created + """ + if (self.mode == "loc"): + if int(file_id) <= 0: raise ValueError("Object is in loc-mode, but no valid file_id passed") + # json-file from slave node + js_file = os.path.join(path_out,'stat_{0:0=2d}.json'.format(int(file_id))) + elif (self.mode == "master"): + if (int(file_id) > 0): print("Warning: Object is master-mode, but file_id passed which will be ignored.") + # (final) json-file from master node + js_file = os.path.join(path_out,'statistics.json') + else: + raise ValueError("Object seems to be initialized only, but no data has been processed so far.") + + try: + with open(js_file,'w') as stat_out: + json.dump(self.stat_dict,stat_out) + except ValueError: + print("Something went wrong when writing dictionary to json-file: '"+js_file+"''") + finally: + print("Created statistics json-file '"+js_file+"' successfully.") + +# ML 2020/05/15 E -def split_data_multiple_years(target_dir,partition): + +def split_data_multiple_years(target_dir,partition,varnames): """ Collect all the X_*.hkl data across years and split them to training, val and testing datatset """ @@ -315,6 +426,10 @@ def split_data_multiple_years(target_dir,partition): splits_dir = os.path.join(target_dir,"splits") os.makedirs(splits_dir, exist_ok=True) splits = {s: [] for s in list(partition.keys())} + # ML 2020/05/19 S + vars_uni, varsind, nvars = get_unique_vars(varnames) + stat_obj = Calc_data_stat(nvars) + for split in partition.keys(): values = partition[split] files = [] @@ -334,6 +449,8 @@ def split_data_multiple_years(target_dir,partition): temporal_data = pickle.load(open(temporal_data_file,"rb")) X = X + list(data) Temporal_X = Temporal_X + list(temporal_data) + # process stat-file: + stat_obj.acc_stat_master(file_dir,int(month)) X = np.array(X) Temporal_X = np.array(Temporal_X) print("==================={}=====================".format(split)) @@ -343,7 +460,10 @@ def split_data_multiple_years(target_dir,partition): hkl.dump(X, os.path.join(splits_dir , 'X_' + split + '.hkl')) pickle.dump(Temporal_X, open(os.path.join(splits_dir,"T_"+split + ".pkl"),"wb")) hkl.dump(files, os.path.join(splits_dir,'sources_' + split + '.hkl')) - + + # write final statistics json-file + stat_obj.finalize_stat_master(target_dir,vars_uni) + stat_obj.write_stat_json(splits_dir)