diff --git a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py index 421d2a6e9cbc052d5545e3c64048ea2ba8f86c1b..30a6ac25cebe365d90a0c2ef60271ab20a3cdb29 100644 --- a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py +++ b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py @@ -16,6 +16,7 @@ sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/')) import DataPreprocess.process_netCDF_v2 from DataPreprocess.process_netCDF_v2 import get_unique_vars from DataPreprocess.process_netCDF_v2 import Calc_data_stat +from metadata import MetaData #from base_dataset import VarLenFeatureVideoDataset from collections import OrderedDict from tensorflow.contrib.training import HParams @@ -336,6 +337,7 @@ def write_sequence_file(output_dir,seq_length): def main(): + print("I'm living...") parser = argparse.ArgumentParser() parser.add_argument("input_dir", type=str, help="directory containing the processed directories ""boxing, handclapping, handwaving, ""jogging, running, walking") parser.add_argument("output_dir", type=str) @@ -359,51 +361,49 @@ def main(): # "2010_1":[1,2,3,4,5,6,7,8,9,10,11,12], # "2012":[1,2,3,4,5,6,7,8,9,10,11,12], # "2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12], - "2015":[1,2,3,4,5,6,7,8,9,10,11,12], - # "2017":[1,2,3,4,5,6,7,8,9,10,11,12] + # "2015":[1,2,3,4,5,6,7,8,9,10,11,12], + "2017":[1,2,3,4,5,6,7,8,9,10] }, "val": - {"2016":[1,2,3,4,5,6,7,8,9,10,11,12] + {"2017":[11] }, "test": - {"2017":[1,2,3,4,5,6,7,8,9,10,11,12] + {"2017":[12] } } - # working section of the script: - - # retrieve final statistics first (not parallelized!) - # some preparatory steps - stat_dir_prefix = os.path.join(MetaData.get_destdir_jsontmp(),"pickle") - varnames = args.variables - - vars_uni, varsind, nvars = get_unique_vars(varnames) - stat_obj = Calc_data_stat(nvars) # init statistic-instance - - # loop over whole data set (training, dev and test set) to collect the intermediate statistics - for split in partition.keys(): - values = partition[split] - for year in values.keys(): - file_dir = os.path.join(stat_dir_prefix,year) - for month in values[year]: - # process stat-file: - stat_obj.acc_stat_master(file_dir,int(month)) # process monthly statistic-file - - # finalize statistics and write to json-file - stat_obj.finalize_stat_master(vars_uni) - stat_obj.write_stat_json(args.input_dir) - - # open statistics file and feed it to norm-instance - with open(os.path.join(args.input_dir,"statistics.json")) as js_file: - stats = json.load(js_file) - # ini. MPI + print("Hallo dude...") comm = MPI.COMM_WORLD my_rank = comm.Get_rank() # rank of the node p = comm.Get_size() # number of assigned nods - + + print("Hallo dude 2...") if my_rank == 0 : + print("Hello, I'm master") + # retrieve final statistics first (not parallelized!) + # some preparatory steps + stat_dir_prefix = args.input_dir + varnames = args.variables + + vars_uni, varsind, nvars = get_unique_vars(varnames) + stat_obj = Calc_data_stat(nvars) # init statistic-instance + + # loop over whole data set (training, dev and test set) to collect the intermediate statistics + print("Start collecting statistics from the whole datset to be processed...") + for split in partition.keys(): + values = partition[split] + for year in values.keys(): + file_dir = os.path.join(stat_dir_prefix,year) + for month in values[year]: + # process stat-file: + stat_obj.acc_stat_master(file_dir,int(month)) # process monthly statistic-file + + # finalize statistics and write to json-file + stat_obj.finalize_stat_master(vars_uni) + stat_obj.write_stat_json(args.input_dir) + partition_year_month = [] #contain lists of list, each list includes three element [train,year,month] partition_names = list(partition.keys()) print ("partition_names:",partition_names) @@ -430,6 +430,10 @@ def main(): message_in = comm.recv() print ("My rank,", my_rank) print("message_in",message_in) + # open statistics file and feed it to norm-instance + print("Opening json-file: "+os.path.join(args.input_dir,"statistics.json")) + with open(os.path.join(args.input_dir,"statistics.json")) as js_file: + stats = json.load(js_file) #loop the partitions (train,val,test) for partition in message_in: print("partition on slave ",partition) @@ -439,7 +443,7 @@ def main(): input_file = "X_" + '{0:02}'.format(my_rank) + ".pkl" input_dir = os.path.join(args.input_dir,year) input_file = os.path.join(input_dir,input_file) - #read_frames_and_save_tf_records(year=year,month=my_rank,stats=stats,output_dir=save_output_dir,input_file=input_file,vars_in=args.variables,partition_name=partition_name, seq_length=args.seq_length,height=args.height,width=args.width,sequences_per_file=2) + read_frames_and_save_tf_records(year=year,month=my_rank,stats=stats,output_dir=save_output_dir,input_file=input_file,vars_in=args.variables,partition_name=partition_name, seq_length=args.seq_length,height=args.height,width=args.width,sequences_per_file=20) print("Year {} finished",year) message_out = ("Node:",str(my_rank),"finished","","\r\n") print ("Message out for slaves:",message_out)