diff --git a/Zam347_scripts/DataPreprocess.sh b/Zam347_scripts/DataPreprocess.sh index 764b949433702053b0889671ebc36880f01c690b..395caf65779518747bf9ee3ace1a82683a4cc5f6 100755 --- a/Zam347_scripts/DataPreprocess.sh +++ b/Zam347_scripts/DataPreprocess.sh @@ -2,7 +2,7 @@ source_dir=/home/$USER/extractedData -destination_dir=/home/$USER/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/hickle +destination_dir=/home/$USER/preprocessedData/era5-Y2017M01to02-128x160-74d00N71d00E-T_MSL_gph500/hickle declare -a years=("2017") for year in "${years[@]}"; @@ -13,7 +13,7 @@ for year in "${years[@]}"; --source_dir ${source_dir}/${year}/ \ --destination_dir ${destination_dir}/${year}/ --vars T2 MSL gph500 --lat_s 74 --lat_e 202 --lon_s 550 --lon_e 710 done -python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py --destination_dir ${destination_dir} +python ../../workflow_parallel_frame_prediction/DataPreprocess/mpi_split_data_multi_years.py --destination_dir ${destination_dir} --varnames T2 MSL gph500 diff --git a/Zam347_scripts/DataPreprocess_to_tf.sh b/Zam347_scripts/DataPreprocess_to_tf.sh index 608f95348b25c2169ae2963821639de8947c322b..ec8cf66ed66f891ca45b19c0d0b8350011f8cd35 100755 --- a/Zam347_scripts/DataPreprocess_to_tf.sh +++ b/Zam347_scripts/DataPreprocess_to_tf.sh @@ -1,4 +1,4 @@ #!/bin/bash -x -python ../video_prediction/datasets/era5_dataset_v2.py /home/${USER}/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/hickle/splits/ /home/${USER}/preprocessedData/era5-Y2015toY2017M01to12-128x160-74d00N71d00E-T_MSL_gph500/tfrecords/ -vars T2 MSL gph500 -height 128 -width 160 -seq_length 20 +python ../video_prediction/datasets/era5_dataset_v2.py /home/${USER}/preprocessedData/era5-Y2017M01to02-128x160-74d00N71d00E-T_MSL_gph500/hickle/splits/ /home/${USER}/preprocessedData/era5-Y2017M01to02-128x160-74d00N71d00E-T_MSL_gph500/tfrecords/ -vars T2 MSL gph500 -height 128 -width 160 -seq_length 20 diff --git a/video_prediction/datasets/era5_dataset_v2.py b/video_prediction/datasets/era5_dataset_v2.py index 606f32f0bb47a66e1190be5b9585e6ef9b5cf752..00111565556014aaa868ba4f8d0c98d8c25ac732 100644 --- a/video_prediction/datasets/era5_dataset_v2.py +++ b/video_prediction/datasets/era5_dataset_v2.py @@ -14,8 +14,9 @@ from video_prediction.datasets.base_dataset import VarLenFeatureVideoDataset from os import path import sys sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/')) -from DataPreprocess.process_netCDF_v2 import get_stat -from DataPreprocess.process_netCDF_v2 import get_stat_allvars +import DataPreprocess.process_netCDF_v2 +from DataPreprocess.process_netCDF_v2 import get_unique_vars +from DataPreprocess.process_netCDF_v2 import Calc_data_stat #from base_dataset import VarLenFeatureVideoDataset from collections import OrderedDict from tensorflow.contrib.training import HParams @@ -160,37 +161,116 @@ def save_tf_record(output_fname, sequences): }) example = tf.train.Example(features=features) writer.write(example.SerializeToString()) + +class Norm_data: + """ + Class for normalizing data. The statistical data for normalization (minimum, maximum, average, standard deviation etc.) is expected to be available from a statistics-dictionary + created with the calc_data_stat-class (see 'process_netCDF_v2.py'. + """ + + ### set known norms and the requested statistics (to be retrieved from statistics.json) here ### + known_norms = {} + known_norms["minmax"] = ["min","max"] + known_norms["znorm"] = ["avg","sigma"] + + def __init__(self,varnames): + """Initialize the instance by setting the variable names to be handled and the status (for sanity checks only) as attributes.""" + varnames_uni, _, nvars = get_unique_vars(varnames) + + self.varnames = varnames_uni + self.status_ok= False + + def check_and_set_norm(self,stat_dict,norm): + """ + Checks if the statistics-dictionary provides the required data for selected normalization method and expands the instance's attributes accordingly. + Example: minmax-normalization requires the minimum and maximum value of a variable named var1. + If the requested values are provided by the statistics-dictionary, the instance gets the attributes 'var1min' and 'var1max',respectively. + """ + + # some sanity checks + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '"+norm+"' is unknown.") + + if not all(items in stat_dict for items in self.varnames): # all variables found in dictionary? + print("Keys in stat_dict:") + print(stat_dict.keys()) + + print("Requested variables:") + print(self.varnames) + raise ValueError("Could not find all requested variables in statistics dictionary.") + + # create all attributes for the instance + for varname in self.varnames: + for stat_name in self.known_norms[norm]: + #setattr(self,varname+stat_name,stat_dict[varname][0][stat_name]) + setattr(self,varname+stat_name,Calc_data_stat.get_stat_vars(stat_dict,stat_name,varname)) + + self.status_ok = True # set status for normalization -> ready + + def norm_var(self,data,varname,norm): + """ + Performs given normalization on input data (given that the instance is already set up) + """ + + # some sanity checks + if not self.status_ok: raise ValueError("Norm_data-instance needs to be initialized and checked first.") # status ready? + + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '"+norm+"' is unknown.") + + # do the normalization and return + if norm == "minmax": + return((data[...] - getattr(self,varname+"min"))/(getattr(self,varname+"max") - getattr(self,varname+"min"))) + elif norm == "znorm": + return((data[...] - getattr(self,varname+"avg"))/getattr(self,varname+"sigma")**2) + + def denorm_var(self,data,varname,norm): + """ + Performs given denormalization on input data (given that the instance is already set up), i.e. inverse method to norm_var + """ + + # some sanity checks + if not self.status_ok: raise ValueError("Norm_data-instance needs to be initialized and checked first.") # status ready? + + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '"+norm+"' is unknown.") + + # do the denormalization and return + if norm == "minmax": + return(data[...] * (getattr(self,varname+"max") - getattr(self,varname+"min")) + getattr(self,varname+"max")) + elif norm == "znorm": + return(data[...] * getattr(self,varname+"sigma")**2 + getattr(self,varname+"avg")) + def read_frames_and_save_tf_records(output_dir,input_dir,partition_name,vars_in,seq_length=20,sequences_per_file=128,height=64,width=64,channels=3,**kwargs):#Bing: original 128 # ML 2020/04/08: # Include vars_in for more flexible data handling (normalization and reshaping) # and optional keyword argument for kind of normalization - known_norms = ["minmax"] # may be more elegant to define a class here? - - output_dir = os.path.join(output_dir,partition_name) - os.makedirs(output_dir,exist_ok=True) - nvars = len(vars_in) - vars_uni, indrev = np.unique(vars_in,return_inverse=True) if 'norm' in kwargs: norm = kwargs.get("norm") - if (not norm in knwon_norms): - raise ValueError("Pass valid normalization identifier.") - print("Known identifiers are: ") - for norm_name in known_norm: - print('"'+norm_name+'"') else: norm = "minmax" + print("Make use of default minmax-normalization...") + + output_dir = os.path.join(output_dir,partition_name) + os.makedirs(output_dir,exist_ok=True) - # open statistics file - with open(os.path.join(input_dir,"statistics.json")) as js_file: - data = json.load(js_file) + norm_cls = Norm_data(vars_in) # init normalization-instance + nvars = len(vars_in) - if (norm == "minmax"): - varmin, varmax = get_stat_allvars(data,"min",vars_in), get_stat_allvars(data,"max",vars_in) - - #print(len(varmin)) - #print(varmin) + # open statistics file and feed it to norm-instance + with open(os.path.join(input_dir,"statistics.json")) as js_file: + norm_cls.check_and_set_norm(json.load(js_file),norm) sequences = [] sequence_iter = 0 @@ -216,12 +296,8 @@ def read_frames_and_save_tf_records(output_dir,input_dir,partition_name,vars_in, ###Normalization should adpot the selected variables, here we used duplicated channel temperature variables sequences = np.array(sequences) ### normalization - # ML 2020/04/08: - # again rather inelegant/inefficient as... - # a) normalization should be cast in class definition (with initialization, setting of norm. approach including - # data retrieval and the normalization itself for i in range(nvars): - sequences[:,:,:,:,i] = (sequences[:,:,:,:,i]-varmin[i])/(varmax[i]-varmin[i]) + sequences[:,:,:,:,i] = norm_cls.norm_var(sequences[:,:,:,:,i],vars_in[i],norm) output_fname = 'sequence_{0}_to_{1}.tfrecords'.format(last_start_sequence_iter, sequence_iter - 1) output_fname = os.path.join(output_dir, output_fname)