diff --git a/video_prediction_savp/env_setup/create_env.sh b/video_prediction_savp/env_setup/create_env.sh index ad388826caf1d077c1c6434acae29d6cbaa9c6fc..888f543db6891ebfd2a57f06dc6cce3f5f2743dc 100644 --- a/video_prediction_savp/env_setup/create_env.sh +++ b/video_prediction_savp/env_setup/create_env.sh @@ -88,6 +88,7 @@ if [[ "$ENV_EXIST" == 0 ]]; then fi # expand PYTHONPATH... export PYTHONPATH=${WORKING_DIR}:$PYTHONPATH >> ${activate_virt_env} + export PYTHONPATH=${WORKING_DIR}/utils:$PYTHONPATH >> ${activate_virt_env} #export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:$PYTHONPATH export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:$PYTHONPATH >> ${activate_virt_env} @@ -98,6 +99,7 @@ if [[ "$ENV_EXIST" == 0 ]]; then echo "" >> ${activate_virt_env} echo "# Expand PYTHONPATH..." >> ${activate_virt_env} echo "export PYTHONPATH=${WORKING_DIR}:\$PYTHONPATH" >> ${activate_virt_env} + echo "export PYTHONPATH=${WORKING_DIR}/utils/:\$PYTHONPATH" >> ${activate_virt_env} #export PYTHONPATH=/p/home/jusers/${USER}/juwels/.local/bin:\$PYTHONPATH echo "export PYTHONPATH=${WORKING_DIR}/external_package/lpips-tensorflow:\$PYTHONPATH" >> ${activate_virt_env} diff --git a/video_prediction_savp/env_setup/modules_preprocess.sh b/video_prediction_savp/env_setup/modules_preprocess.sh index c4a242b0a739eb65c7a340dd8e3a3fbb57b04408..a9de812dbde625a18198fe078ecb86c09286ed6d 100755 --- a/video_prediction_savp/env_setup/modules_preprocess.sh +++ b/video_prediction_savp/env_setup/modules_preprocess.sh @@ -9,7 +9,7 @@ HOST_NAME=`hostname` echo "Start loading modules on ${HOST_NAME} required for preprocessing..." -echo "This script is used by: " +echo "modules_preprocess.sh is subject to: " echo "* DataExtraction.sh" echo "* DataPreprocess.sh" diff --git a/video_prediction_savp/env_setup/modules_train.sh b/video_prediction_savp/env_setup/modules_train.sh index cdbc436e1976773132aa636a3f1cedfa506d3a14..d45144340d334430b3d95580ceb2e74c8105e18a 100755 --- a/video_prediction_savp/env_setup/modules_train.sh +++ b/video_prediction_savp/env_setup/modules_train.sh @@ -9,6 +9,10 @@ HOST_NAME=`hostname` echo "Start loading modules on ${HOST_NAME}..." +echo "modules_train.sh is subject to: " +echo "* DataPreprocess_to_tf.sh" +echo "* train_era5.sh" +echo "* generate_era5.sh" module purge module use $OTHERSTAGES diff --git a/video_prediction_savp/env_setup/requirements.txt b/video_prediction_savp/env_setup/requirements.txt index 173b8a10c8dec1d8186adc84c144b79863406d3f..c77693a918331c4049c43f70190ca4eea5c1e56a 100644 --- a/video_prediction_savp/env_setup/requirements.txt +++ b/video_prediction_savp/env_setup/requirements.txt @@ -1,5 +1,6 @@ opencv-python==4.2.0.34 scipy +matplotlib==3.3.0 scikit-image pandas hickle diff --git a/video_prediction_savp/scripts/generate_transfer_learning_finetune.py b/video_prediction_savp/scripts/generate_transfer_learning_finetune.py index 1fcbd1cf97442f1ea440039a0bb6769473b957f3..7948d650cc270c9ee33dcd32c2e03c70f9216225 100644 --- a/video_prediction_savp/scripts/generate_transfer_learning_finetune.py +++ b/video_prediction_savp/scripts/generate_transfer_learning_finetune.py @@ -32,8 +32,7 @@ import datetime # Scarlet 2020/05/28: access to statistical values in json file from os import path import sys -sys.path.append(path.abspath('../video_prediction/datasets/')) -from era5_dataset_v2 import Norm_data +from normalization import Norm_data from os.path import dirname from netCDF4 import Dataset,date2num from metadata import MetaData as MetaData diff --git a/video_prediction_savp/utils/general_utils.py b/video_prediction_savp/utils/general_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..9318726a270d34c38adc92e7803a588d96fc5243 --- /dev/null +++ b/video_prediction_savp/utils/general_utils.py @@ -0,0 +1,60 @@ +""" +Some auxilary routines which may are used throughout the project. +Provides: * get_unique_vars + * + +""" + +# import modules +import os +import sys +import numpy as np + +# routines +def get_unique_vars(varnames): + """ + :param varnames: list of variable names (or any other list of strings) + :return: list with unique elements of inputted varnames list + """ + vars_uni, varsind = np.unique(varnames, return_index=True) + nvars_uni = len(vars_uni) + + return (vars_uni, varsind, nvars_uni) + + +def add_str_to_path(path_in, add_str): + """ + :param path_in: input path which is a candidate to be extended by add_str (see below) + :param add_str: String to be added to path_in if not already done + :return: Extended version of path_in (by add_str) if add_str is not already part of path_in. + Function is also capable to handle carriage returns for handling input-strings obtained by reading a file. + """ + + l_linebreak = path_in.endswith("\n") # flag for carriage return at the end of input string + line_str = path_in.rstrip("\n") + + if (not line_str.endswith(add_str)) or \ + (not line_str.endswith(add_str.rstrip("/"))): + + line_str = line_str + add_str + "/" + else: + print(add_str + " is already part of " + line_str + ". No change is performed.") + + if l_linebreak: # re-add carriage return to string if required + return (line_str + "\n") + else: + return (line_str) + + +def is_integer(n): + """ + :param n: input string + :return: True if n is a string containing an integer, else False + """ + + try: + float(n) + except ValueError: + return False + else: + return float(n).is_integer() \ No newline at end of file diff --git a/video_prediction_savp/metadata.py b/video_prediction_savp/utils/metadata.py similarity index 92% rename from video_prediction_savp/metadata.py rename to video_prediction_savp/utils/metadata.py index 8f61d5766169c08d793b665253a8cd1e86a80548..fada7c47b1cf3d79a4d88fe6e5584a7da5bc7ec3 100644 --- a/video_prediction_savp/metadata.py +++ b/video_prediction_savp/utils/metadata.py @@ -1,5 +1,5 @@ """ -Classes and routines to retrieve and handle meta-data +Class to retrieve and handle meta-data """ import os @@ -8,6 +8,7 @@ import time import numpy as np import json from netCDF4 import Dataset +from general_utils import is_integer, add_str_to_path class MetaData: """ @@ -337,41 +338,6 @@ class MetaData: # ----------------------------------- end of class MetaData ----------------------------------- -# some auxilary functions which are not bound to MetaData-class - -def add_str_to_path(path_in,add_str): - - """ - Adds add_str to path_in if path_in does not already end with add_str. - Function is also capable to handle carriage returns for handling input-strings obtained by reading a file. - """ - - l_linebreak = path_in.endswith("\n") # flag for carriage return at the end of input string - line_str = path_in.rstrip("\n") - - if (not line_str.endswith(add_str)) or \ - (not line_str.endswith(add_str.rstrip("/"))): - - line_str = line_str + add_str + "/" - else: - print(add_str+" is already part of "+line_str+". No change is performed.") - - if l_linebreak: # re-add carriage return to string if required - return(line_str+"\n") - else: - return(line_str) - - -def is_integer(n): - ''' - Checks if input string is numeric and of type integer. - ''' - try: - float(n) - except ValueError: - return False - else: - return float(n).is_integer() diff --git a/video_prediction_savp/utils/normalization.py b/video_prediction_savp/utils/normalization.py new file mode 100644 index 0000000000000000000000000000000000000000..e3ad06164a39b9dd3676d518e908b003af55f364 --- /dev/null +++ b/video_prediction_savp/utils/normalization.py @@ -0,0 +1,96 @@ +""" + Class for normalizing data. The statistical data for normalization (minimum, maximum, average, standard deviation etc.) is expected to be available from a statistics-dictionary + created with the calc_data_stat-class (see 'process_netCDF_v2.py'. +""" + +from general_utils import get_unique_vars +from statistics import Calc_data_stat +import numpy as np + +class Norm_data: + + ### set known norms and the requested statistics (to be retrieved from statistics.json) here ### + known_norms = {} + known_norms["minmax"] = ["min", "max"] + known_norms["znorm"] = ["avg", "sigma"] + + def __init__(self, varnames): + """Initialize the instance by setting the variable names to be handled and the status (for sanity checks only) as attributes.""" + varnames_uni, _, nvars = get_unique_vars(varnames) + + self.varnames = varnames_uni + self.status_ok = False + + def check_and_set_norm(self, stat_dict, norm): + """ + Checks if the statistics-dictionary provides the required data for selected normalization method and expands the instance's attributes accordingly. + Example: minmax-normalization requires the minimum and maximum value of a variable named var1. + If the requested values are provided by the statistics-dictionary, the instance gets the attributes 'var1min' and 'var1max',respectively. + """ + + # some sanity checks + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '" + norm + "' is unknown.") + + if not all(items in stat_dict for items in self.varnames): # all variables found in dictionary? + print("Keys in stat_dict:") + print(stat_dict.keys()) + + print("Requested variables:") + print(self.varnames) + raise ValueError("Could not find all requested variables in statistics dictionary.") + + # create all attributes for the instance + for varname in self.varnames: + for stat_name in self.known_norms[norm]: + # setattr(self,varname+stat_name,stat_dict[varname][0][stat_name]) + setattr(self, varname + stat_name, Calc_data_stat.get_stat_vars(stat_dict, stat_name, varname)) + + self.status_ok = True # set status for normalization -> ready + + def norm_var(self, data, varname, norm): + """ + Performs given normalization on input data (given that the instance is already set up) + """ + + # some sanity checks + if not self.status_ok: raise ValueError( + "Norm_data-instance needs to be initialized and checked first.") # status ready? + + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '" + norm + "' is unknown.") + + # do the normalization and return + if norm == "minmax": + return ((data[...] - getattr(self, varname + "min")) / ( + getattr(self, varname + "max") - getattr(self, varname + "min"))) + elif norm == "znorm": + return ((data[...] - getattr(self, varname + "avg")) / getattr(self, varname + "sigma") ** 2) + + def denorm_var(self, data, varname, norm): + """ + Performs given denormalization on input data (given that the instance is already set up), i.e. inverse method to norm_var + """ + + # some sanity checks + if not self.status_ok: raise ValueError( + "Norm_data-instance needs to be initialized and checked first.") # status ready? + + if not norm in self.known_norms.keys(): # valid normalization requested? + print("Please select one of the following known normalizations: ") + for norm_avail in self.known_norms.keys(): + print(norm_avail) + raise ValueError("Passed normalization '" + norm + "' is unknown.") + + # do the denormalization and return + if norm == "minmax": + return (data[...] * (getattr(self, varname + "max") - getattr(self, varname + "min")) + getattr(self, + varname + "min")) + elif norm == "znorm": + return (data[...] * getattr(self, varname + "sigma") ** 2 + getattr(self, varname + "avg")) \ No newline at end of file diff --git a/video_prediction_savp/utils/statistics.py b/video_prediction_savp/utils/statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..ea537fd4928a8d6ff9019e8cafff24a6b60413ce --- /dev/null +++ b/video_prediction_savp/utils/statistics.py @@ -0,0 +1,242 @@ +""" +Class to calculate and save statistics in the ambs-workflow parallelized by PyStager. +In addition to savin the statistics in json-files, it also comprises methods to read those files. +""" + +import os +import sys +import time +import numpy as np +import json +from general_utils import get_unique_vars + +class Calc_data_stat: + + def __init__(self, nvars): + """ + Initializes the instance for later use, i.e. initializes attributes with expected shape + """ + self.stat_dict = {} + self.varmin = np.full((nvars, 1), np.nan) # avoid rank one-arrays + self.varmax = np.full((nvars, 1), np.nan) + self.varavg = np.zeros((nvars, + 1)) # second dimension acts as placeholder for averaging on master node collecting json-files from slave nodes + self.nfiles = [0] # number of processed files + self.mode = "" # mode to distinguish between processing on slave and master nodes (sanity check) + self.jsfiles = [""] # list of processed json-files (master-mode only!) + + def acc_stat_loc(self, ivar, data): + """ + Performs accumulation of all statistics while looping through all data files (i.e. updates the statistics) on slave nodes + """ + if not self.mode: + self.mode = "loc" + elif self.mode == "master": + raise ValueError("Cannot switch to loc-mode during runtime...") + else: + pass + + self.varmin[ivar] = np.fmin(self.varmin[ivar], np.amin(data)) + self.varmax[ivar] = np.fmax(self.varmax[ivar], np.amax(data)) + self.varavg[ivar, 0] += np.average( + data) # note that we sum the average -> readjustment required in the final step + if (ivar == 0): self.nfiles[0] += 1 + + def finalize_stat_loc(self, varnames): + """ + Finalizes computation of statistics after going through all the data on slave nodes. + Afterwards the statistics dictionary is ready for being written in a json-file. + """ + + if self.mode != "loc": + raise ValueError("Object is not in loc-mode. Probably some master-method has been called previously.") + + if self.stat_dict: raise ValueError("Statistics dictionary is not empty.") + + vars_uni, varsind = np.unique(varnames, return_index=True) + nvars = len(vars_uni) + + vars_uni, varsind, nvars = get_unique_vars(varnames) + + varmin, varmax, varavg = self.varmin[varsind], self.varmax[varsind], self.varavg[varsind, 0] + + for i in range(nvars): + varavg[i] /= self.nfiles # for adjusting the (summed) average + + self.stat_dict[vars_uni[i]] = [] + self.stat_dict[vars_uni[i]].append({ + 'min': varmin[i, 0].tolist(), + 'max': varmax[i, 0].tolist(), + 'avg': varavg[i].tolist() + }) + self.stat_dict["common_stat"] = [ + {"nfiles": self.nfiles[0]}] + + def acc_stat_master(self, file_dir, file_id): + """ + Opens statistics-file (created by slave nodes) and accumulates its content. + """ + + if (int(file_id) <= 0): raise ValueError("Non-valid file_id passed.") + + if not self.mode: + self.mode = "master" + elif self.mode == "loc": + raise ValueError("Cannot switch to master-mode during runtime...") + else: + pass + + # sanity check: check if dictionary is initialized with unique values only + if self.stat_dict.keys() > set(self.stat_dict.keys()): + raise ValueError("Initialized dictionary contains duplicates of variales. Need unique collection instead.") + else: + pass + + file_name = os.path.join(file_dir, "stat_{0:0=2d}.json".format(int(file_id))) + + if not file_name in self.jsfiles: + print("Try to open: '" + file_name + "'") + + try: + with open(file_name) as js_file: + dict_in = json.load(js_file) + + # sanity check + if (len(dict_in.keys()) - 1 != len(self.varmin)): + raise ValueError( + "Different number of variables found in json-file '" + js_file + "' as expected from statistics object.") + + self.varmin = np.fmin(self.varmin, Calc_data_stat.get_stat_allvars(dict_in, "min")) + self.varmax = np.fmax(self.varmax, Calc_data_stat.get_stat_allvars(dict_in, "max")) + + if (np.all(self.varavg == 0.) or self.nfiles[0] == 0): + self.varavg = Calc_data_stat.get_stat_allvars(dict_in, "avg") + self.nfiles[0] = Calc_data_stat.get_common_stat(dict_in, "nfiles") + self.jsfiles[0] = file_name + else: + self.varavg = np.append(self.varavg, Calc_data_stat.get_stat_allvars(dict_in, "avg"), axis=1) + self.nfiles.append(Calc_data_stat.get_common_stat(dict_in, "nfiles")) + self.jsfiles.append(file_name) + except IOError: + print("Cannot handle statistics file '" + file_name + "' to be processed.") + except ValueError: + print("Cannot retireve all required statistics from '" + file_name + "'") + else: + print("Statistics file '" + file_name + "' has already been processed. Thus, just pass here...") + pass + + def finalize_stat_master(self, vars_uni): + """ + Performs final compuattion of statistics after accumulation from slave nodes. + """ + if self.mode != "master": + raise ValueError("Object is not in master-mode. Probably some loc-method has been called previously.") + + if len(vars_uni) > len(set(vars_uni)): + raise ValueError("Input variable names are not unique.") + + nvars = len(vars_uni) + n_jsfiles = len(self.nfiles) + nfiles_all = np.sum(self.nfiles) + avg_wgt = np.array(self.nfiles, dtype=float) / float(nfiles_all) + + varmin, varmax = self.varmin, self.varmax + varavg = np.sum(np.multiply(self.varavg, avg_wgt), axis=1) # calculate weighted average + + for i in range(nvars): + self.stat_dict[vars_uni[i]] = [] + self.stat_dict[vars_uni[i]].append({ + 'min': varmin[i, 0].tolist(), + 'max': varmax[i, 0].tolist(), + 'avg': varavg[i].tolist() + }) + self.stat_dict["common_stat"] = [ + {"nfiles": int(nfiles_all), + "jsfiles": self.jsfiles + }] + + @staticmethod + def get_stat_allvars(stat_dict, stat_name): + """ + Unpacks statistics dictionary and returns values of stat_name of all variables contained in the dictionary. + """ + + # some sanity checks + if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError( + "Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + stat_dict_filter = (stat_dict).copy() + stat_dict_filter.pop("common_stat") + + if not stat_dict_filter.keys(): raise ValueError("Input dictionary does not contain any variables.") + + try: + varstat = np.array([stat_dict_filter[i][0][stat_name] for i in [*stat_dict_filter.keys()]]) + if np.ndim(varstat) == 1: # avoid returning rank 1-arrays + return varstat.reshape(-1, 1) + else: + return varstat + except: + raise ValueError("Could not find " + stat_name + " for all variables of input dictionary.") + + @staticmethod + def get_stat_vars(stat_dict, stat_name, vars_in): + """ + Retrieves requested statistics (stat_name) for all unique variables listed in allvars given statistics dictionary. + If more than one unique variable is processed, this method returns a list, whereas a scalar is returned else. + """ + + if not stat_dict: raise ValueError("Statistics dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError( + "Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + vars_uni, indrev = np.unique(vars_in, return_inverse=True) + + try: + if len(vars_uni) > 1: + return ([stat_dict[var][0][stat_name] for var in vars_uni[indrev]]) + else: + return (stat_dict[vars_uni[0]][0][stat_name]) + except: + raise ValueError("Could not find " + stat_name + " for all variables of input dictionary.") + + @staticmethod + def get_common_stat(stat_dict, stat_name): + + if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") + if not "common_stat" in stat_dict.keys(): raise ValueError( + "Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") + + common_stat_dict = stat_dict["common_stat"][0] + + try: + return (common_stat_dict[stat_name]) + except: + raise ValueError("Could not find " + stat_name + " in common_stat of input dictionary.") + + def write_stat_json(self, path_out, file_id=-1): + """ + Writes statistics-dictionary of slave nodes to json-file (with job_id in the output name) + If file_id is passed (and greater than 0), parallelized peration on a slave node is assumed. + Else: method is invoked from master node, i.e. final json-file is created + """ + if (self.mode == "loc"): + if int(file_id) <= 0: raise ValueError("Object is in loc-mode, but no valid file_id passed") + # json-file from slave node + js_file = os.path.join(path_out, 'stat_{0:0=2d}.json'.format(int(file_id))) + elif (self.mode == "master"): + if (int(file_id) > 0): print("Warning: Object is master-mode, but file_id passed which will be ignored.") + # (final) json-file from master node + js_file = os.path.join(path_out, 'statistics.json') + else: + raise ValueError("Object seems to be initialized only, but no data has been processed so far.") + + try: + with open(js_file, 'w') as stat_out: + json.dump(self.stat_dict, stat_out) + except ValueError: + print("Something went wrong when writing dictionary to json-file: '" + js_file + "''") + finally: + print("Created statistics json-file '" + js_file + "' successfully.") + diff --git a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py index 2fb693e4066570a93889850c571654a46fb81e4e..7a61aa090f9e115ffd140a54dd0784dbbd35c48d 100644 --- a/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py +++ b/video_prediction_savp/video_prediction/datasets/era5_dataset_v2.py @@ -13,10 +13,11 @@ from video_prediction.datasets.base_dataset import VarLenFeatureVideoDataset from os import path import sys sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/')) -import DataPreprocess.process_netCDF_v2 -from DataPreprocess.process_netCDF_v2 import get_unique_vars -from DataPreprocess.process_netCDF_v2 import Calc_data_stat +import DataPreprocess.process_netCDF_v2 +from general_utils import get_unique_vars +from statistics import Calc_data_stat from metadata import MetaData +from normalization import Norm_data #from base_dataset import VarLenFeatureVideoDataset from collections import OrderedDict from tensorflow.contrib.training import HParams @@ -161,94 +162,6 @@ def save_tf_record(output_fname, sequences,T_start_points): }) example = tf.train.Example(features=features) writer.write(example.SerializeToString()) - -class Norm_data: - """ - Class for normalizing data. The statistical data for normalization (minimum, maximum, average, standard deviation etc.) is expected to be available from a statistics-dictionary - created with the calc_data_stat-class (see 'process_netCDF_v2.py'. - """ - - ### set known norms and the requested statistics (to be retrieved from statistics.json) here ### - known_norms = {} - known_norms["minmax"] = ["min","max"] - known_norms["znorm"] = ["avg","sigma"] - - def __init__(self,varnames): - """Initialize the instance by setting the variable names to be handled and the status (for sanity checks only) as attributes.""" - varnames_uni, _, nvars = get_unique_vars(varnames) - - self.varnames = varnames_uni - self.status_ok= False - - def check_and_set_norm(self,stat_dict,norm): - """ - Checks if the statistics-dictionary provides the required data for selected normalization method and expands the instance's attributes accordingly. - Example: minmax-normalization requires the minimum and maximum value of a variable named var1. - If the requested values are provided by the statistics-dictionary, the instance gets the attributes 'var1min' and 'var1max',respectively. - """ - - # some sanity checks - if not norm in self.known_norms.keys(): # valid normalization requested? - print("Please select one of the following known normalizations: ") - for norm_avail in self.known_norms.keys(): - print(norm_avail) - raise ValueError("Passed normalization '"+norm+"' is unknown.") - - if not all(items in stat_dict for items in self.varnames): # all variables found in dictionary? - print("Keys in stat_dict:") - print(stat_dict.keys()) - - print("Requested variables:") - print(self.varnames) - raise ValueError("Could not find all requested variables in statistics dictionary.") - - # create all attributes for the instance - for varname in self.varnames: - for stat_name in self.known_norms[norm]: - #setattr(self,varname+stat_name,stat_dict[varname][0][stat_name]) - setattr(self,varname+stat_name,Calc_data_stat.get_stat_vars(stat_dict,stat_name,varname)) - - self.status_ok = True # set status for normalization -> ready - - def norm_var(self,data,varname,norm): - """ - Performs given normalization on input data (given that the instance is already set up) - """ - - # some sanity checks - if not self.status_ok: raise ValueError("Norm_data-instance needs to be initialized and checked first.") # status ready? - - if not norm in self.known_norms.keys(): # valid normalization requested? - print("Please select one of the following known normalizations: ") - for norm_avail in self.known_norms.keys(): - print(norm_avail) - raise ValueError("Passed normalization '"+norm+"' is unknown.") - - # do the normalization and return - if norm == "minmax": - return((data[...] - getattr(self,varname+"min"))/(getattr(self,varname+"max") - getattr(self,varname+"min"))) - elif norm == "znorm": - return((data[...] - getattr(self,varname+"avg"))/getattr(self,varname+"sigma")**2) - - def denorm_var(self,data,varname,norm): - """ - Performs given denormalization on input data (given that the instance is already set up), i.e. inverse method to norm_var - """ - - # some sanity checks - if not self.status_ok: raise ValueError("Norm_data-instance needs to be initialized and checked first.") # status ready? - - if not norm in self.known_norms.keys(): # valid normalization requested? - print("Please select one of the following known normalizations: ") - for norm_avail in self.known_norms.keys(): - print(norm_avail) - raise ValueError("Passed normalization '"+norm+"' is unknown.") - - # do the denormalization and return - if norm == "minmax": - return(data[...] * (getattr(self,varname+"max") - getattr(self,varname+"min")) + getattr(self,varname+"min")) - elif norm == "znorm": - return(data[...] * getattr(self,varname+"sigma")**2 + getattr(self,varname+"avg")) def read_frames_and_save_tf_records(stats,output_dir,input_file, temp_input_file, vars_in,year,month,seq_length=20,sequences_per_file=128,height=64,width=64,channels=3,**kwargs):#Bing: original 128 diff --git a/video_prediction_savp/video_prediction/datasets/moving_mnist.py b/video_prediction_savp/video_prediction/datasets/moving_mnist.py index 12a517ac9540dd443866d47ea689803abe9e9a4d..ba8556859951e59cdec7d72e4319cb98cabe1ac5 100644 --- a/video_prediction_savp/video_prediction/datasets/moving_mnist.py +++ b/video_prediction_savp/video_prediction/datasets/moving_mnist.py @@ -14,8 +14,8 @@ from os import path import sys sys.path.append(path.abspath('../../workflow_parallel_frame_prediction/')) import DataPreprocess.process_netCDF_v2 -from DataPreprocess.process_netCDF_v2 import get_unique_vars -from DataPreprocess.process_netCDF_v2 import Calc_data_stat +from general_utils import get_unique_vars +from statistics import Calc_data_stat from metadata import MetaData #from base_dataset import VarLenFeatureVideoDataset from collections import OrderedDict diff --git a/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py b/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py index 1a66fa51011174ab2d8adb2a4eb6b3b0ae3b4a7e..cf74a54f6ba1a8258bef004f8c50609816937d3f 100644 --- a/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py +++ b/workflow_parallel_frame_prediction/DataPreprocess/process_netCDF_v2.py @@ -5,6 +5,7 @@ Code for processing staged ERA5 data import os import glob from netCDF4 import Dataset,num2date +from statistics import Calc_data_stat #import requests #from bs4 import BeautifulSoup #import urllib.request @@ -98,242 +99,6 @@ def process_netCDF_in_dir(src_dir,**kwargs): process_data(directory_to_process=directory_to_process, **kwargs) -# ML 2020/05/15 S -def get_unique_vars(varnames): - vars_uni, varsind = np.unique(varnames,return_index = True) - nvars_uni = len(vars_uni) - - return(vars_uni, varsind, nvars_uni) - -class Calc_data_stat: - """Class for computing statistics and saving them to a json-files.""" - - def __init__(self,nvars): - """ - Initializes the instance for later use, i.e. initializes attributes with expected shape - """ - self.stat_dict = {} - self.varmin = np.full((nvars,1),np.nan) # avoid rank one-arrays - self.varmax = np.full((nvars,1),np.nan) - self.varavg = np.zeros((nvars,1)) # second dimension acts as placeholder for averaging on master node collecting json-files from slave nodes - self.nfiles = [0] # number of processed files - self.mode = "" # mode to distinguish between processing on slave and master nodes (sanity check) - self.jsfiles = [""] # list of processed json-files (master-mode only!) - - def acc_stat_loc(self,ivar,data): - """ - Performs accumulation of all statistics while looping through all data files (i.e. updates the statistics) on slave nodes - """ - if not self.mode: - self.mode = "loc" - elif self.mode == "master": - raise ValueError("Cannot switch to loc-mode during runtime...") - else: - pass - - self.varmin[ivar] = np.fmin(self.varmin[ivar],np.amin(data)) - self.varmax[ivar] = np.fmax(self.varmax[ivar],np.amax(data)) - self.varavg[ivar,0] += np.average(data) # note that we sum the average -> readjustment required in the final step - if (ivar == 0): self.nfiles[0] += 1 - - def finalize_stat_loc(self,varnames): - """ - Finalizes computation of statistics after going through all the data on slave nodes. - Afterwards the statistics dictionary is ready for being written in a json-file. - """ - - if self.mode != "loc": - raise ValueError("Object is not in loc-mode. Probably some master-method has been called previously.") - - if self.stat_dict: raise ValueError("Statistics dictionary is not empty.") - - vars_uni, varsind = np.unique(varnames,return_index=True) - nvars = len(vars_uni) - - vars_uni, varsind, nvars = get_unique_vars(varnames) - - varmin, varmax, varavg = self.varmin[varsind], self.varmax[varsind], self.varavg[varsind,0] - - for i in range(nvars): - varavg[i] /= self.nfiles # for adjusting the (summed) average - - self.stat_dict[vars_uni[i]]=[] - self.stat_dict[vars_uni[i]].append({ - 'min': varmin[i,0].tolist(), - 'max': varmax[i,0].tolist(), - 'avg': varavg[i].tolist() - }) - self.stat_dict["common_stat"] = [ - {"nfiles":self.nfiles[0]}] - - def acc_stat_master(self,file_dir,file_id): - """ - Opens statistics-file (created by slave nodes) and accumulates its content. - """ - - if (int(file_id) <= 0): raise ValueError("Non-valid file_id passed.") - - if not self.mode: - self.mode = "master" - elif self.mode == "loc": - raise ValueError("Cannot switch to master-mode during runtime...") - else: - pass - - # sanity check: check if dictionary is initialized with unique values only - if self.stat_dict.keys() > set(self.stat_dict.keys()): - raise ValueError("Initialized dictionary contains duplicates of variales. Need unique collection instead.") - else: - pass - - file_name = os.path.join(file_dir,"stat_{0:0=2d}.json".format(int(file_id))) - - if not file_name in self.jsfiles: - print("Try to open: '"+file_name+"'") - - try: - with open(file_name) as js_file: - dict_in = json.load(js_file) - - # sanity check - if (len(dict_in.keys()) -1 != len(self.varmin)): - raise ValueError("Different number of variables found in json-file '"+js_file+"' as expected from statistics object.") - - self.varmin = np.fmin(self.varmin,Calc_data_stat.get_stat_allvars(dict_in,"min")) - self.varmax = np.fmax(self.varmax,Calc_data_stat.get_stat_allvars(dict_in,"max")) - - if (np.all(self.varavg == 0.) or self.nfiles[0] == 0): - self.varavg = Calc_data_stat.get_stat_allvars(dict_in,"avg") - self.nfiles[0] = Calc_data_stat.get_common_stat(dict_in,"nfiles") - self.jsfiles[0]= file_name - else: - self.varavg = np.append(self.varavg,Calc_data_stat.get_stat_allvars(dict_in,"avg"),axis=1) - self.nfiles.append(Calc_data_stat.get_common_stat(dict_in,"nfiles")) - self.jsfiles.append(file_name) - except IOError: - print("Cannot handle statistics file '"+file_name+"' to be processed.") - except ValueError: - print("Cannot retireve all required statistics from '"+file_name+"'") - else: - print("Statistics file '"+file_name+"' has already been processed. Thus, just pass here...") - pass - - def finalize_stat_master(self,vars_uni): - """ - Performs final compuattion of statistics after accumulation from slave nodes. - """ - if self.mode != "master": - raise ValueError("Object is not in master-mode. Probably some loc-method has been called previously.") - - if len(vars_uni) > len(set(vars_uni)): - raise ValueError("Input variable names are not unique.") - - nvars = len(vars_uni) - n_jsfiles = len(self.nfiles) - nfiles_all= np.sum(self.nfiles) - avg_wgt = np.array(self.nfiles,dtype=float)/float(nfiles_all) - - varmin, varmax = self.varmin, self.varmax - varavg = np.sum(np.multiply(self.varavg,avg_wgt),axis=1) # calculate weighted average - - for i in range(nvars): - self.stat_dict[vars_uni[i]]=[] - self.stat_dict[vars_uni[i]].append({ - 'min': varmin[i,0].tolist(), - 'max': varmax[i,0].tolist(), - 'avg': varavg[i].tolist() - }) - self.stat_dict["common_stat"] = [ - {"nfiles": int(nfiles_all), - "jsfiles": self.jsfiles - }] - - @staticmethod - def get_stat_allvars(stat_dict,stat_name): - """ - Unpacks statistics dictionary and returns values of stat_name of all variables contained in the dictionary. - """ - - # some sanity checks - if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") - if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") - - stat_dict_filter = (stat_dict).copy() - stat_dict_filter.pop("common_stat") - - if not stat_dict_filter.keys(): raise ValueError("Input dictionary does not contain any variables.") - - try: - varstat = np.array([stat_dict_filter[i][0][stat_name] for i in [*stat_dict_filter.keys()]]) - if np.ndim(varstat) == 1: # avoid returning rank 1-arrays - return varstat.reshape(-1,1) - else: - return varstat - except: - raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") - - @staticmethod - def get_stat_vars(stat_dict,stat_name,vars_in): - """ - Retrieves requested statistics (stat_name) for all unique variables listed in allvars given statistics dictionary. - If more than one unique variable is processed, this method returns a list, whereas a scalar is returned else. - """ - - if not stat_dict: raise ValueError("Statistics dictionary is still empty! Cannot access anything from it.") - if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") - - vars_uni,indrev = np.unique(vars_in,return_inverse=True) - - try: - if len(vars_uni) > 1: - return([stat_dict[var][0][stat_name] for var in vars_uni[indrev]]) - else: - return(stat_dict[vars_uni[0]][0][stat_name]) - except: - raise ValueError("Could not find "+stat_name+" for all variables of input dictionary.") - - @staticmethod - def get_common_stat(stat_dict,stat_name): - - if not stat_dict: raise ValueError("Input dictionary is still empty! Cannot access anything from it.") - if not "common_stat" in stat_dict.keys(): raise ValueError("Input dictionary does not seem to be a proper statistics dictionary as common_stat-element is missing.") - - common_stat_dict = stat_dict["common_stat"][0] - - try: - return(common_stat_dict[stat_name]) - except: - raise ValueError("Could not find "+stat_name+" in common_stat of input dictionary.") - - - def write_stat_json(self,path_out,file_id = -1): - """ - Writes statistics-dictionary of slave nodes to json-file (with job_id in the output name) - If file_id is passed (and greater than 0), parallelized peration on a slave node is assumed. - Else: method is invoked from master node, i.e. final json-file is created - """ - if (self.mode == "loc"): - if int(file_id) <= 0: raise ValueError("Object is in loc-mode, but no valid file_id passed") - # json-file from slave node - js_file = os.path.join(path_out,'stat_{0:0=2d}.json'.format(int(file_id))) - elif (self.mode == "master"): - if (int(file_id) > 0): print("Warning: Object is master-mode, but file_id passed which will be ignored.") - # (final) json-file from master node - js_file = os.path.join(path_out,'statistics.json') - else: - raise ValueError("Object seems to be initialized only, but no data has been processed so far.") - - try: - with open(js_file,'w') as stat_out: - json.dump(self.stat_dict,stat_out) - except ValueError: - print("Something went wrong when writing dictionary to json-file: '"+js_file+"''") - finally: - print("Created statistics json-file '"+js_file+"' successfully.") - -# ML 2020/05/15 E - - # ML 2020/08/03 Not used anymore! #def split_data_multiple_years(target_dir,partition,varnames): #"""