Select Git revision
CIAO_s3_remote_read.py
CIAO_s3_remote_read.py 6.84 KiB
import glob
import os
# import boto3
# import botocore
import h5py
import numpy as np
print(h5py.__version__)
import sys
import argparse
import time
import boto3
import json
import multiprocessing as mp
######################################################################
########################Function Definition###########################
######################################################################
#Section required for the proper functioning of multiprocessing
class s3_details: #Variables for defining S3 file details and how much to read from file
def __init__(self, s3_endpoint, bucket_name, access_id, access_key, region, dimension, target, window_width, prefix):
self.s3_endpoint = s3_endpoint
self.bucket_name = bucket_name
self.access_id = access_id
self.access_key = access_key
self.region = region
self.dimension = dimension
self.target = target
self.window_width = window_width
self.prefix = prefix
def read_variable(self,column_to_read): #Downloads subsection of separated SC dataset via S3, of window_width around the target centre element. Should work with either 2D or 3D data, based on the "dim.json" file
print(self.s3_endpoint+"/"+self.bucket_name+"/"+self.prefix+"_"+column_to_read+".h5")
with h5py.File(self.s3_endpoint+"/"+self.bucket_name+"/"+self.prefix+"_"+column_to_read+".h5", driver='ros3', secret_id=bytes(self.access_id, encoding='utf-8'), secret_key=bytes(self.access_key, encoding='utf-8'), aws_region=bytes(self.region, encoding = 'utf-8')) as f:#Encoding is important for h5py to properly understand the strings
if self.dimension ==1: #2d case
data_part = f[column_to_read][0,self.target[1]-self.window_width:self.target[1]+self.window_width+1,self.target[2]-self.window_width:self.target[2]+self.window_width+1]#download target element and window_width around it only, for 2d case
elif self.dimension >1: #3d case
data_part = f[column_to_read][self.target[0]-self.window_width:self.target[0]+self.window_width+1,self.target[1]-self.window_width:self.target[1]+self.window_width+1,self.target[2]-self.window_width:self.target[2]+self.window_width+1]#download target element and window_width around it only, for 3d case
return data_part
######################################################################
######################S3 Connection Details###########################
######################################################################
s3_endpoint = "https://s3-coec.jsc.fz-juelich.de" #S3 Endpoint to connect to. As supplied, this is the best available for CoEC
bucket_name = "" #Set bucket to target. Script expects all files inside to be from the same simulation
access_id = "" #Set accesss_id for S3 account
access_key = "" #Set secret access_key for S3 account
region = "us-east-1" #Set region for S3 endpoint.
######################################################################
##########################Target Window###############################
#####################################################################
window_width= 20 #Set number of elements either side of the target element to download in each dimension
target_type = "manual" #Use a single target element for all frames of simulation output
#target_type = "manual_list" #Manually define a list of target elements for all frames of simulation output
#target_type = "auto" #Use the pre-generated hottest element list for all frames of simulation output
target = (201,201,201) # If using the target_type manual option, define the target element for all frames here
manual_target=[]
######################################################################
#####################Functional Section###############################
######################################################################
##########################Read Metadata ##############################
with open("prefix_list.json", 'r') as f: #Read .json for list of filenames
prefix_list = json.load(f)
#print(prefix_list)
with open("var_list.json", 'r') as f: #Read .json for list of variables
var_list = json.load(f)
with open("dim.json", 'r') as f: #Read .json for dimensions of SC dataset variables, needed to distinguish 2D or 3D data
dim = json.load(f)
with open("T_max_arg_list.json", 'r') as f: #Read .json for list of hottest elements of each simulation frame.
T_max_arg_list = json.load(f)
dimension = dim[0] #Use information from .json file to determine 2D or 3D simulation
#############################Targeting###############################
if target_type == "auto":
print("Target list populated from .json file")
target_list = T_max_arg_list
elif target_type == "manual":
print("Target list populated from single target element")
target_list = [target] *len(prefix_list)
elif target_type == "manual_list": #If use the manual list option, you will have to create a list the same length as the number of files
print("Target list populated from manually supplied element list")
target_list= manual_target #with a 3-element list as each element within it.
#############################Download###############################
pool = mp.Pool(len(var_list)) #Declare pool of threads, one for each variable to download
for count, prefix in enumerate(prefix_list):
print("Prefix being read is "+prefix)
target = target_list[count] #Set the target centre element
s3file = s3_details(s3_endpoint,bucket_name,access_id,access_key,region, dimension, target ,window_width,prefix) #Set details of download
#start_time=time.time() #Start timing if benchmarking
data_payload=np.array(pool.map(s3file.read_variable,var_list)) #Download all variables in parallel
#print(f"\n elapsed time is %f"% (time.time()-start_time)) #Print timing if benchmarking
#print("number of elements is " +str(data_payload.size)) #Print timing if benchmarking
np.save(prefix+"_out", data_payload) #Save array to file, based on each original file name. All variables saved to one array in corrected order, constructed infirst half of workflow
del data_payload #If data_payload is very large, it will need to be deleted between attempts to function properly
pool.close() #Close MultiProcessing Thread Pool