diff --git a/video_prediction_savp/HPC_scripts/DataExtraction.sh b/video_prediction_savp/HPC_scripts/DataExtraction.sh index 3d868b9cc923e9e0bcd0d1e1e24e69b9e8705a4f..b44065e7babb0411cda6d2849ec429f3672c60d5 100755 --- a/video_prediction_savp/HPC_scripts/DataExtraction.sh +++ b/video_prediction_savp/HPC_scripts/DataExtraction.sh @@ -38,7 +38,7 @@ dest_dir="/p/scratch/deepacf/video_prediction_shared_folder/extractedData/" year="2010" # Run data extraction -srun python ../../workflow_parallel_frame_prediction/DataExtraction/mpi_stager_v2.py --source_dir ${source_dir}/${year} --destination_dir ${dest_dir}/${year}/ +srun python ../../workflow_parallel_frame_prediction/DataExtraction/mpi_stager_v2.py --source_dir ${source_dir}/${year}/ --destination_dir ${dest_dir}/${year}/ diff --git a/video_prediction_savp/HPC_scripts/DataPreprocess.sh b/video_prediction_savp/HPC_scripts/DataPreprocess.sh index a9bd46413abaca1d2df3670b0acbcc8d32c919a6..aa84de9de7dce7015b26f040aaec48d0b096a816 100755 --- a/video_prediction_savp/HPC_scripts/DataPreprocess.sh +++ b/video_prediction_savp/HPC_scripts/DataPreprocess.sh @@ -12,11 +12,21 @@ #SBATCH --mail-type=ALL #SBATCH --mail-user=b.gong@fz-juelich.de + +# Name of virtual environment +VIRT_ENV_NAME="virt_env_hdfml" + +# Activate virtual environment if needed (and possible) if [ -z ${VIRTUAL_ENV} ]; then - echo "Please activate a virtual environment..." - exit 1 + if [[ -f ../${VIRT_ENV_NAME}/bin/activate ]]; then + echo "Activating virtual environment..." + source ../${VIRT_ENV_NAME}/bin/activate + else + echo "ERROR: Requested virtual environment ${VIRT_ENV_NAME} not found..." + exit 1 + fi fi - +# Loading mouldes source ../env_setup/modules_preprocess.sh source_dir=${SAVE_DIR}/extractedData diff --git a/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh b/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh index 273863179bdfb673ff79991708aa69fc415b8997..bcf950e93145bcc8b0d15892a606d4cc5d7dd66e 100755 --- a/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh +++ b/video_prediction_savp/HPC_scripts/DataPreprocess_to_tf.sh @@ -29,8 +29,8 @@ if [ -z ${VIRTUAL_ENV} ]; then fi # declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py) -source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 -destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 +source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/ +destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/ # run Preprocessing (step 2 where Tf-records are generated) -srun python ../video_prediction/datasets/era5_dataset_v2.py ${source_dir}/hickle ${destination_dir}/tfrecords -vars T2 MSL gph500 -height 128 -width 160 -seq_length 20 +srun python ../video_prediction/datasets/era5_dataset_v2.py ${source_dir}/pickle ${destination_dir}/tfrecords -vars T2 MSL gph500 -height 128 -width 160 -seq_length 20 diff --git a/video_prediction_savp/HPC_scripts/generate_era5.sh b/video_prediction_savp/HPC_scripts/generate_era5.sh index 9041a16fcb596ffec4a08de80d30dccb954b60ed..bb36609129d2c45c5c0cbfaaf0675a7e338eb09c 100755 --- a/video_prediction_savp/HPC_scripts/generate_era5.sh +++ b/video_prediction_savp/HPC_scripts/generate_era5.sh @@ -29,12 +29,12 @@ if [ -z ${VIRTUAL_ENV} ]; then fi fi - # declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py) -source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 -checkpoint_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/models/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 -results_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/results/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 -#TODO: remove this after testing +source_dir=/p/scratch/deepacf/video_prediction_shared_folder/preprocessedData/ +checkpoint_dir=/p/scratch/deepacf/video_prediction_shared_folder/models/ +results_dir=/p/scratch/deepacf/video_prediction_shared_folder/results/ + +# name of model model=convLSTM # run postprocessing/generation of model results including evaluation metrics diff --git a/video_prediction_savp/HPC_scripts/train_era5.sh b/video_prediction_savp/HPC_scripts/train_era5.sh index 6baafa8a9f3f3c9164759ebe8aa6490f9555b733..f605866056f6b2d9fa179a00850468fee0c72d87 100755 --- a/video_prediction_savp/HPC_scripts/train_era5.sh +++ b/video_prediction_savp/HPC_scripts/train_era5.sh @@ -34,8 +34,8 @@ fi # declare directory-variables which will be modified appropriately during Preprocessing (invoked by mpi_split_data_multi_years.py) -source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 -destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/models/era5-Y2015to2017M01to12-160x128-2970N1500W-T2_MSL_gph500 +source_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/preprocessedData/ +destination_dir=/p/project/deepacf/deeprain/video_prediction_shared_folder/models/ # for choosing the model model=convLSTM diff --git a/video_prediction_savp/env_setup/modules_train.sh b/video_prediction_savp/env_setup/modules_train.sh index c60ec66c4b91837094478e0288dd7518edd44e17..cdbc436e1976773132aa636a3f1cedfa506d3a14 100755 --- a/video_prediction_savp/env_setup/modules_train.sh +++ b/video_prediction_savp/env_setup/modules_train.sh @@ -21,6 +21,5 @@ module load mpi4py/3.0.1-Python-3.6.8 module load h5py/2.9.0-serial-Python-3.6.8 module load TensorFlow/1.13.1-GPU-Python-3.6.8 module load cuDNN/7.5.1.10-CUDA-10.1.105 - module load netcdf4-python/1.5.0.1-Python-3.6.8 diff --git a/video_prediction_savp/env_setup/requirements.txt b/video_prediction_savp/env_setup/requirements.txt index 76dd1f57d64577cc565968bb7106656e53687261..4bf2f0b25d082c4c503bbd56f46d28360a48df43 100644 --- a/video_prediction_savp/env_setup/requirements.txt +++ b/video_prediction_savp/env_setup/requirements.txt @@ -2,3 +2,4 @@ opencv-python scipy scikit-image pandas +hickle diff --git a/video_prediction_savp/metadata.py b/video_prediction_savp/metadata.py index da0beb4f571c4ed9a81d94f0cbadfcce53ff02a0..8f61d5766169c08d793b665253a8cd1e86a80548 100644 --- a/video_prediction_savp/metadata.py +++ b/video_prediction_savp/metadata.py @@ -4,6 +4,7 @@ Classes and routines to retrieve and handle meta-data import os import sys +import time import numpy as np import json from netCDF4 import Dataset @@ -172,7 +173,8 @@ class MetaData: meta_fname = os.path.join(dest_dir,"metadata.json") - if os.path.exists(meta_fname): # check if a metadata-file already exists and check its content + if os.path.exists(meta_fname): # check if a metadata-file already exists and check its content + print(method_name+": json-file ('"+meta_fname+"' already exists. Its content will be checked...") self.status = "old" # set status to old in order to prevent repeated modification of shell-/Batch-scripts with open(meta_fname,'r') as js_file: dict_dupl = json.load(js_file) @@ -241,6 +243,7 @@ class MetaData: def write_destdir_jsontmp(dest_dir, tmp_dir = None): """ Writes dest_dir to temporary json-file (temp.json) stored in the current working directory. + To be executed by Master node in parallel mode. """ if not tmp_dir: tmp_dir = os.getcwd() @@ -276,6 +279,34 @@ class MetaData: else: return(dict_tmp.get("destination_dir")) + @staticmethod + def wait_for_jsontmp(tmp_dir = None, waittime = 10, delay=0.5): + """ + Waits at max. waittime (in sec) until temp.json-file becomes available + """ + + method_name = MetaData.wait_for_jsontmp.__name__+" of Class "+MetaData.__name__ + + if not tmp_dir: tmp_dir = os.getcwd() + + file_tmp = os.path.join(tmp_dir,"temp.json") + + counter_max = waittime/delay + counter = 0 + status = "not_ok" + + while (counter <= counter_max): + if os.path.isfile(file_tmp): + status = "ok" + break + else: + time.sleep(delay) + + counter += 1 + + if status != "ok": raise IOError(method_name+": '"+file_tmp+ \ + "' does not exist after waiting for "+str(waittime)+" sec.") + @staticmethod def issubset(a,b): diff --git a/video_prediction_savp/scripts/generate_transfer_learning_finetune.py b/video_prediction_savp/scripts/generate_transfer_learning_finetune.py index 79a0db1379bd4ff31f371d593e084e7c142956cf..fb61f5f123fd55bcfc8a44652843ee70ba4dfcb8 100644 --- a/video_prediction_savp/scripts/generate_transfer_learning_finetune.py +++ b/video_prediction_savp/scripts/generate_transfer_learning_finetune.py @@ -387,6 +387,7 @@ def main(): sess.graph.as_default() sess.run(tf.global_variables_initializer()) sess.run(tf.local_variables_initializer()) + model.restore(sess, args.checkpoint) #model.restore(sess, args.checkpoint)#Bing: Todo: 20200728 Let's only focus on true and persistend data sample_ind, gen_images_all, persistent_images_all, input_images_all = initia_save_data() diff --git a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py index 69e2b6ad087ad3522540e802ace487bbbc13ad56..a3c9fc3666eb21ef02f9e5f64c8c95a29034d619 100644 --- a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py +++ b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py @@ -16,6 +16,7 @@ sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/')) import DataPreprocess.process_netCDF_v2 from DataPreprocess.process_netCDF_v2 import get_unique_vars from DataPreprocess.process_netCDF_v2 import Calc_data_stat +from metadata import MetaData #from base_dataset import VarLenFeatureVideoDataset from collections import OrderedDict from tensorflow.contrib.training import HParams @@ -61,7 +62,6 @@ class ERA5Dataset_v2(VarLenFeatureVideoDataset): sequence_lengths = [int(sequence_length.strip()) for sequence_length in sequence_lengths] return np.sum(np.array(sequence_lengths) >= self.hparams.sequence_length) - def filter(self, serialized_example): return tf.convert_to_tensor(True) @@ -253,9 +253,9 @@ class Norm_data: def read_frames_and_save_tf_records(stats,output_dir,input_file, temp_input_file, vars_in,year,month,seq_length=20,sequences_per_file=128,height=64,width=64,channels=3,**kwargs):#Bing: original 128 """ - Read hickle/pickle files based on month, to process and save to tfrecords - stats:dict, contains the stats information from hickle directory, - input_file: string, absolute path to hickle/pickle file + Read pickle files based on month, to process and save to tfrecords + stats:dict, contains the stats information from pickle directory, + input_file: string, absolute path to pickle file file_info: 1D list with three elements, partition_name(train,val or test), year, and month e.g.[train,1,2] """ # ML 2020/04/08: @@ -281,9 +281,6 @@ def read_frames_and_save_tf_records(stats,output_dir,input_file, temp_input_file T_start_points = [] sequence_iter = 0 #sequence_lengths_file = open(os.path.join(output_dir, 'sequence_lengths.txt'), 'w') - # ML 2020/07/15: Make use of pickle-files only - #with open(os.path.join(input_dir, "X_" + partition_name + ".pkl"), "rb") as data_file: - # X_train = pickle.load(data_file) #Bing 2020/07/16 #print ("open intput dir,",input_file) with open(input_file, "rb") as data_file: @@ -328,7 +325,7 @@ def read_frames_and_save_tf_records(stats,output_dir,input_file, temp_input_file #sequence_lengths_file.close() return -def write_sequence_file(output_dir,seq_length): +def write_sequence_file(output_dir,seq_length,sequences_per_file): partition_names = ["train","val","test"] for partition_name in partition_names: @@ -336,7 +333,7 @@ def write_sequence_file(output_dir,seq_length): tfCounter = len(glob.glob1(save_output_dir,"*.tfrecords")) print("Partition_name: {}, number of tfrecords: {}".format(partition_name,tfCounter)) sequence_lengths_file = open(os.path.join(save_output_dir, 'sequence_lengths.txt'), 'w') - for i in range(tfCounter): + for i in range(tfCounter*sequences_per_file): sequence_lengths_file.write("%d\n" % seq_length) sequence_lengths_file.close() @@ -352,41 +349,62 @@ def main(): parser.add_argument("-height",type=int,default=64) parser.add_argument("-width",type = int,default=64) parser.add_argument("-seq_length",type=int,default=20) + parser.add_argument("-sequences_per_file",type=int,default=2) args = parser.parse_args() current_path = os.getcwd() #input_dir = "/Users/gongbing/PycharmProjects/video_prediction/splits" #output_dir = "/Users/gongbing/PycharmProjects/video_prediction/data/era5" #partition_names = ['train','val', 'test'] #64,64,3 val has issue# + ############################################################ + # CONTROLLING variable! Needs to be adapted manually!!! + ############################################################ partition = { "train":{ # "2222":[1,2,3,5,6,7,8,9,10,11,12], # "2010_1":[1,2,3,4,5,6,7,8,9,10,11,12], # "2012":[1,2,3,4,5,6,7,8,9,10,11,12], # "2013_complete":[1,2,3,4,5,6,7,8,9,10,11,12], - "2015":[1,2,3,4,5,6,7,8,9,10,11,12], - # "2017":[1,2,3,4,5,6,7,8,9,10,11,12] + # "2015":[1,2,3,4,5,6,7,8,9,10,11,12], + "2017_test":[1,2,3,4,5,6,7,8,9,10] }, "val": - {"2016":[1,2,3,4,5,6,7,8,9,10,11,12] + {"2017_test":[11] }, "test": - {"2017":[1,2,3,4,5,6,7,8,9,10,11,12] + {"2017_test":[12] } } - # open statistics file and feed it to norm-instance - with open(os.path.join(args.input_dir,"statistics.json")) as js_file: - stats = json.load(js_file) - - #TODO: search all the statistic json file correspdoing to the parition and generate a general statistic.json for normalization - # ini. MPI comm = MPI.COMM_WORLD my_rank = comm.Get_rank() # rank of the node p = comm.Get_size() # number of assigned nods - + if my_rank == 0 : + # retrieve final statistics first (not parallelized!) + # some preparatory steps + stat_dir_prefix = args.input_dir + varnames = args.variables + + vars_uni, varsind, nvars = get_unique_vars(varnames) + stat_obj = Calc_data_stat(nvars) # init statistic-instance + + # loop over whole data set (training, dev and test set) to collect the intermediate statistics + print("Start collecting statistics from the whole datset to be processed...") + for split in partition.keys(): + values = partition[split] + for year in values.keys(): + file_dir = os.path.join(stat_dir_prefix,year) + for month in values[year]: + # process stat-file: + stat_obj.acc_stat_master(file_dir,int(month)) # process monthly statistic-file + + # finalize statistics and write to json-file + stat_obj.finalize_stat_master(vars_uni) + stat_obj.write_stat_json(args.input_dir) + + # organize parallelized partioning partition_year_month = [] #contain lists of list, each list includes three element [train,year,month] partition_names = list(partition.keys()) print ("partition_names:",partition_names) @@ -406,12 +424,17 @@ def main(): message_counter = message_counter + 1 print("Message in from slaver",message_in) - write_sequence_file(args.output_dir,args.seq_length) + write_sequence_file(args.output_dir,args.seq_length,args.sequences_per_file) + #write_sequence_file else: message_in = comm.recv() print ("My rank,", my_rank) print("message_in",message_in) + # open statistics file and feed it to norm-instance + print("Opening json-file: "+os.path.join(args.input_dir,"statistics.json")) + with open(os.path.join(args.input_dir,"statistics.json")) as js_file: + stats = json.load(js_file) #loop the partitions (train,val,test) for partition in message_in: print("partition on slave ",partition) @@ -423,7 +446,11 @@ def main(): input_dir = os.path.join(args.input_dir,year) temp_file = os.path.join(input_dir,temp_file ) input_file = os.path.join(input_dir,input_file) - read_frames_and_save_tf_records(year=year,month=my_rank,stats=stats,output_dir=save_output_dir,input_file=input_file,temp_input_file=temp_file,vars_in=args.variables,partition_name=partition_name,seq_length=args.seq_length,height=args.height,width=args.width,sequences_per_file=2) + # create the tfrecords-files + read_frames_and_save_tf_records(year=year,month=my_rank,stats=stats,output_dir=save_output_dir, \ + input_file=input_file,temp_input_file=temp_file,vars_in=args.variables, \ + partition_name=partition_name,seq_length=args.seq_length, \ + height=args.height,width=args.width,sequences_per_file=args.sequences_per_file) print("Year {} finished",year) message_out = ("Node:",str(my_rank),"finished","","\r\n") diff --git a/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py b/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py index c7f3db7ce4fce732312eba0d9f17362faa2e64b5..7560a225e7651728e2ca8d2107d7f32458106c86 100644 --- a/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py +++ b/video_prediction_savp/video_prediction/models/vanilla_convLSTM_model.py @@ -41,7 +41,7 @@ class VanillaConvLstmVideoPredictionModel(BaseVideoPredictionModel): lr: learning rate. if decay steps is non-zero, this is the learning rate for steps <= decay_step. max_steps: number of training steps. - context_frames: the number of ground-truth frames to pass in at + context_frames: the number of ground-truth frames to pass :qin at start. Must be specified during instantiation. sequence_length: the number of frames in the video sequence, including the context frames, so this model predicts diff --git a/workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py b/workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py index fdc2f65a5092469c021e0c21b0606e7e7d248c5c..71c661c49ba3502b12dbd409fb76a5c9b4517087 100755 --- a/workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py +++ b/workflow_parallel_frame_prediction/DataPreprocess/mpi_stager_v2_process_netCDF.py @@ -109,7 +109,6 @@ def main(): # Expand destination_dir-variable by searching for netCDF-files in source_dir and processing the file from the first list element to obtain all relevant (meta-)data. if my_rank == 0: data_files_list = glob.glob(source_dir+"/**/*.nc",recursive=True) - if not data_files_list: raise ValueError("Could not find any data to be processed in '"+source_dir+"'") md = MetaData(suffix_indir=destination_dir,data_filename=data_files_list[0],slices=slices,variables=vars) @@ -118,15 +117,26 @@ def main(): md.write_dirs_to_batch_scripts(scr_dir+"/DataPreprocess_to_tf.sh") md.write_dirs_to_batch_scripts(scr_dir+"/generate_era5.sh") md.write_dirs_to_batch_scripts(scr_dir+"/train_era5.sh") - # ML 2020/06/08: Dirty workaround as long as data-splitting is done with a seperate Python-script - # called from the same parent Shell-/Batch-script - # -> work with temproary json-file in working directory - md.write_destdir_jsontmp(os.path.join(md.expdir,md.expname),tmp_dir=current_path) - #else: nothing to do + + elif (md.status == "old"): # meta-data file already exists and is ok + # check for temp.json in working directory (required by slave nodes) + tmp_file = os.path.join(current_path,"temp.json") + if os.path.isfile(tmp_file): + os.remove(tmp_file) + mess_tmp_file = "Auxiliary file '"+tmp_file+"' already exists, but is cleaned up to be updated" + \ + " for safety reasons." + logging.info(mess_tmp_file) + + # ML 2019/06/08: Dirty workaround as long as data-splitting is done with a seperate Python-script + # called from the same parent Shell-/Batch-script + # -> work with temproary json-file in working directory + # create or update temp.json, respectively + md.write_destdir_jsontmp(os.path.join(md.expdir, md.expname), tmp_dir=current_path) - destination_dir= os.path.join(md.expdir,md.expname,"hickle",years) + # expand destination directory by pickle-subfolder and... + destination_dir= os.path.join(md.expdir,md.expname,"pickle",years) - # ...and create directory if necessary + # ...create directory if necessary if not os.path.exists(destination_dir): # check if the Destination dir. is existing logging.critical('The Destination does not exist') logging.info('Create new destination dir') @@ -218,7 +228,7 @@ def main(): #process_era5_in_dir(job, src_dir=source_dir, target_dir=destination_dir) # ML 2020/06/09: workaround to get correct destination_dir obtained by the master node - destination_dir = os.path.join(MetaData.get_destdir_jsontmp(tmp_dir=current_path),"hickle",years) + destination_dir = os.path.join(MetaData.get_destdir_jsontmp(tmp_dir=current_path),"pickle",years) process_netCDF_in_dir(job_name=job, src_dir=source_dir, target_dir=destination_dir,slices=slices,vars=vars) if checksum_status == 1: diff --git a/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py b/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py index 0c0024af3a2928ba4ec1cdc1dddb5f2ac84a133d..1a66fa51011174ab2d8adb2a4eb6b3b0ae3b4a7e 100644 --- a/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py +++ b/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py @@ -17,6 +17,14 @@ import pickle # Create image datasets. # Processes images and saves them in train, val, test splits. def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2","MSL","gph500")): + ''' + :param directory_to_process: directory where netCDF-files are stored to be processed + :param target_dir: directory where pickle-files will e stored + :param job_name: job_id passed and organized by PyStager + :param slices: indices defining geographical region of interest + :param vars: variables to be processed + :return: Saves pickle-files which contain the sliced meteorological data and temporal information as well + ''' desired_im_sz = (slices["lat_e"] - slices["lat_s"], slices["lon_e"] - slices["lon_s"]) # ToDo: Define a convenient function to create a list containing all files. imageList = list(os.walk(directory_to_process, topdown = False))[-1][-1] @@ -24,28 +32,19 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", EU_stack_list = [0] * (len(imageList)) temporal_list = [0] * (len(imageList)) nvars = len(vars) - #X = np.zeros((len(splits[split]),) + desired_im_sz + (3,), np.uint8) - #print(X) - #print('shape of X' + str(X.shape)) - ##### TODO: iterate over split and read every .nc file, cut out array, - ##### overlay arrays for RGB like style. - ##### Save everything after for loop. # ML 2020/04/06 S # Some inits stat_obj = Calc_data_stat(nvars) # ML 2020/04/06 E for j, im_file in enumerate(imageList): - #20200408,Bing try: im_path = os.path.join(directory_to_process, im_file) print('Open following dataset: '+im_path) - #20200408,Bing - im = Dataset(im_path, mode = 'r') - times = im.variables['time'] - time = num2date(times[:],units=times.units,calendar=times.calendar) vars_list = [] with Dataset(im_path,'r') as data_file: + times = data_file.variables['time'] + time = num2date(times[:],units=times.units,calendar=times.calendar) for i in range(nvars): var1 = data_file.variables[vars[i]][0,slices["lat_s"]:slices["lat_e"], slices["lon_s"]:slices["lon_e"]] stat_obj.acc_stat_loc(i,var1) @@ -56,9 +55,6 @@ def process_data(directory_to_process, target_dir, job_name, slices, vars=("T2", #20200408,bing temporal_list[j] = list(time) - #print('Does ist work? ') - #print(EU_stack_list[i][:,:,0]==EU_t2) - #print(EU_stack[:,:,1]==EU_msl except Exception as err: im_path = os.path.join(directory_to_process, im_file) #im = Dataset(im_path, mode = 'r') @@ -94,7 +90,7 @@ def process_netCDF_in_dir(src_dir,**kwargs): if not os.path.exists(target_dir): os.mkdir(target_dir) #target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.hkl') # ML 2020/07/15: Make use of pickle-files only - target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.pkl') + target_file = os.path.join(target_dir, 'X_' + str(job_name) + '.hkl') if os.path.exists(target_file): print(target_file," file exists in the directory ", target_dir) else: @@ -102,67 +98,6 @@ def process_netCDF_in_dir(src_dir,**kwargs): process_data(directory_to_process=directory_to_process, **kwargs) -#def split_data(target_dir, partition= [0.6, 0.2, 0.2]): - #split_dir = target_dir + "/splits" - #if not os.path.exists(split_dir): os.mkdir(split_dir) - #os.chdir(target_dir) - #files = glob.glob("*.hkl") - #filesList = sorted(files) - ##Bing: 20200415 - #temporal_files = glob.glob("*.pkl") - #temporal_filesList = sorted(temporal_files) - - ## determine correct indicesue - #train_begin = 0 - #train_end = round(partition[0] * len(filesList)) - 1 - #val_begin = train_end + 1 - #val_end = train_end + round(partition[1] * len(filesList)) - #test_begin = val_end + 1 - - - ## slightly adapting start and end because starts at the first index given and stops before(!) the last. - #train_files = filesList[train_begin:val_begin] - #val_files = filesList[val_begin:test_begin] - #test_files = filesList[test_begin:] - ##bing: 20200415 - #train_temporal_files = temporal_filesList[train_begin:val_begin] - #val_temporal_files = temporal_filesList[val_begin:test_begin] - #test_temporal_files = temporal_filesList[test_begin:] - - - #splits = {s: [] for s in ['train', 'test', 'val']} - #splits['val'] = val_files - #splits['test'] = test_files - #splits['train'] = train_files - - - #splits_temporal = {s: [] for s in ['train', 'test', 'val']} - #splits_temporal["train"] = train_temporal_files - #splits_temporal["val"] = val_temporal_files - #splits_temporal["test"] = test_temporal_files - - #for split in splits: - #X = [] - #X_temporal = [] - #files = splits[split] - #temporal_files = splits_temporal[split] - #for file, temporal_file in zip(files, temporal_files): - #data_file = os.path.join(target_dir,file) - #temporal_file = os.path.join(target_dir,temporal_file) - ##load data with hkl file - #data = hkl.load(data_file) - #temporal_data = pickle.load(open(temporal_file,"rb")) - #X_temporal = X_temporal + list(temporal_data) - #X = X + list(data) - #X = np.array(X) - #X_temporal = np.array(X_temporal) - #print ("X_temporal",X_temporal) - ##save training, val and test data into splits directoyr - #hkl.dump(X, os.path.join(split_dir, 'X_' + split + '.hkl')) - #hkl.dump(files, os.path.join(split_dir,'sources_' + split + '.hkl')) - #pickle.dump(X_temporal,open(os.path.join(split_dir,"T_"+split + ".pkl"),"wb")) - #print ("PICKLE FILE FOR SPLITS SAVED") - # ML 2020/05/15 S def get_unique_vars(varnames): vars_uni, varsind = np.unique(varnames,return_index = True) @@ -283,7 +218,7 @@ class Calc_data_stat: print("Statistics file '"+file_name+"' has already been processed. Thus, just pass here...") pass - def finalize_stat_master(self,path_out,vars_uni): + def finalize_stat_master(self,vars_uni): """ Performs final compuattion of statistics after accumulation from slave nodes. """ @@ -293,7 +228,6 @@ class Calc_data_stat: if len(vars_uni) > len(set(vars_uni)): raise ValueError("Input variable names are not unique.") - js_file = os.path.join(path_out,"statistics.json") nvars = len(vars_uni) n_jsfiles = len(self.nfiles) nfiles_all= np.sum(self.nfiles) @@ -400,69 +334,62 @@ class Calc_data_stat: # ML 2020/05/15 E -def split_data_multiple_years(target_dir,partition,varnames): - """ - Collect all the X_*.pkl data across years and split them to training, val and testing datatset - """ - #target_dirs = [os.path.join(target_dir,year) for year in years] - #os.chdir(target_dir) - splits_dir = os.path.join(target_dir,"splits") - os.makedirs(splits_dir, exist_ok=True) - splits = {s: [] for s in list(partition.keys())} - # ML 2020/05/19 S - vars_uni, varsind, nvars = get_unique_vars(varnames) - stat_obj = Calc_data_stat(nvars) +# ML 2020/08/03 Not used anymore! +#def split_data_multiple_years(target_dir,partition,varnames): + #""" + #Collect all the X_*.hkl data across years and split them to training, val and testing datatset + #""" + ##target_dirs = [os.path.join(target_dir,year) for year in years] + ##os.chdir(target_dir) + #splits_dir = os.path.join(target_dir,"splits") + #os.makedirs(splits_dir, exist_ok=True) + #splits = {s: [] for s in list(partition.keys())} + ## ML 2020/05/19 S + #vars_uni, varsind, nvars = get_unique_vars(varnames) + #stat_obj = Calc_data_stat(nvars) - for split in partition.keys(): - values = partition[split] - files = [] - X = [] - Temporal_X = [] - for year in values.keys(): - file_dir = os.path.join(target_dir,year) - for month in values[year]: - month = "{0:0=2d}".format(month) - # ML 2020/07/15: Make use of pickle-files only + #for split in partition.keys(): + #values = partition[split] + #files = [] + #X = [] + #Temporal_X = [] + #for year in values.keys(): + #file_dir = os.path.join(target_dir,year) + #for month in values[year]: + #month = "{0:0=2d}".format(month) #hickle_file = "X_{}.hkl".format(month) - pickle_file = "X_{}.pkl".format(month) - #20200408:bing - temporal_file = "T_{}.pkl".format(month) + ##20200408:bing + #temporal_file = "T_{}.pkl".format(month) + ##data_file = os.path.join(file_dir,hickle_file) #data_file = os.path.join(file_dir,hickle_file) - data_file = os.path.join(file_dir,pickle_file) - temporal_data_file = os.path.join(file_dir,temporal_file) - files.append(data_file) - # ML 2020/07/15: Make use of pickle-files only + #temporal_data_file = os.path.join(file_dir,temporal_file) + #files.append(data_file) #data = hkl.load(data_file) - with open(data_file,"rb") as fdata: - data = pickle.load(fdata) - with open(temporal_data_file,"rb") as ftemp: - temporal_data = pickle.load(ftemp) - X = X + list(data) - Temporal_X = Temporal_X + list(temporal_data) - # process stat-file: - stat_obj.acc_stat_master(file_dir,int(month)) - X = np.array(X) - Temporal_X = np.array(Temporal_X) - print("==================={}=====================".format(split)) - print ("Sources for {} dataset are {}".format(split,files)) - print("Number of images in {} dataset is {} ".format(split,len(X))) - print ("dataset shape is {}".format(np.array(X).shape)) - # ML 2020/07/15: Make use of pickle-files only - with open(os.path.join(splits_dir , 'X_' + split + '.pkl'),"wb") as data_file: - pickle.dump(X,data_file,protocol=4) - #hkl.dump(X, os.path.join(splits_dir , 'X_' + split + '.hkl')) - - with open(os.path.join(splits_dir,"T_"+split + ".pkl"),"wb") as temp_file: - pickle.dump(Temporal_X, temp_file) + #with open(temporal_data_file,"rb") as ftemp: + #temporal_data = pickle.load(ftemp) + #X = X + list(data) + #Temporal_X = Temporal_X + list(temporal_data) + ## process stat-file: + #stat_obj.acc_stat_master(file_dir,int(month)) + #X = np.array(X) + #Temporal_X = np.array(Temporal_X) + #print("==================={}=====================".format(split)) + #print ("Sources for {} dataset are {}".format(split,files)) + #print("Number of images in {} dataset is {} ".format(split,len(X))) + #print ("dataset shape is {}".format(np.array(X).shape)) + ## ML 2020/07/15: Make use of pickle-files only + #with open(os.path.join(splits_dir , 'X_' + split + '.pkl'),"wb") as data_file: + #pickle.dump(X,data_file,protocol=4) + ##hkl.dump(X, os.path.join(splits_dir , 'X_' + split + '.hkl')) + + #with open(os.path.join(splits_dir,"T_"+split + ".pkl"),"wb") as temp_file: + #pickle.dump(Temporal_X, temp_file) - # ML 2020/07/15: Make use of pickle-files only - with open(os.path.join(splits_dir,'sources_' + split + '.pkl'),"wb") as src_file: - pickle.dump(files, src_file) #hkl.dump(files, os.path.join(splits_dir,'sources_' + split + '.hkl')) - # write final statistics json-file - stat_obj.finalize_stat_master(target_dir,vars_uni) - stat_obj.write_stat_json(splits_dir) + ## write final statistics json-file + #stat_obj.finalize_stat_master(target_dir,vars_uni) + #stat_obj.write_stat_json(splits_dir)