Skip to content
Snippets Groups Projects
Commit 4f41a384 authored by Felix Kleinert's avatar Felix Kleinert
Browse files

first draft of nc extractor fixed without chunks for dataset

parent 2c9f7603
No related branches found
No related tags found
1 merge request!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #65144 passed
......@@ -21,7 +21,7 @@ def get_files(path, start_time, end_time, search_pattern=None):
return path_list
def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def cut_data(file, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def south_north_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True)
......@@ -46,7 +46,7 @@ def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10))
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True)
# data = xr.open_dataset(file, chunks="auto")
data = xr.open_dataset(file)#, chunks="auto")
time_vars = {d for d in data for i in data[d].dims if "XTIME" == i}
south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i}
......@@ -98,7 +98,7 @@ def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10))
return data_cut
def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def cut_data_coords(file, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def south_north_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True)
......@@ -123,7 +123,9 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True)
# data = xr.open_dataset(file, chunks="auto")
data = xr.open_dataset(file, chunks="auto")
coords = data.coords
data = data.reset_coords()
time_vars = {d for d in data for i in data[d].dims if "Time" == i}
south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i}
......@@ -167,6 +169,8 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(
assert len(data) == len(data_cut)
data_cut = data_cut.compute()
data_cut = data_cut.set_coords(coords.keys())
try:
data.close()
except:
......@@ -175,16 +179,17 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(
def f_proc(file, new_file):
data = xr.open_dataset(file, chunks="auto")
if os.path.basename(file) == "coords.nc":
coords = data.coords
data = data.reset_coords()
d = cut_data_coords(data)
d = d.set_coords(coords.keys())
else:
d = cut_data(data)
d = cut_data(file)
d.to_netcdf(new_file)
logging.info(f"created new_file: {new_file}")
return new_file
def f_proc_coords(file, new_file):
d = cut_data_coords(file)
d.to_netcdf(new_file)
return 0
logging.info(f"created new_file: {new_file}")
return True
def run_apply_async_multiprocessing(func, argument_list, num_processes):
......@@ -207,22 +212,22 @@ if __name__ == "__main__":
coords_file = glob.glob(os.path.join(os.path.split(path)[0], "coords.nc"))
coords_file_new = [os.path.join(new_path, os.path.basename(p)) for p in coords_file]
f_proc(coords_file[0], coords_file_new[0])
c = f_proc_coords(coords_file[0], coords_file_new[0])
path_list = get_files(path, start_time, end_time)
path_list_new = [os.path.join(new_path, os.path.basename(p)) for p in path_list]
print(f"found {len(path_list)} files")
num_processes = min([psutil.cpu_count(logical=False), len(path_list), 16])
# result_list = run_apply_async_multiprocessing(func=f_proc, argument_list=(path_list, path_list_new), num_processes=num_processes)
result_list = run_apply_async_multiprocessing(func=f_proc, argument_list=(path_list, path_list_new), num_processes=7)
# progress_bar = tqdm.tqdm(total=len(path_list))
#
pool = multiprocessing.Pool(min([psutil.cpu_count(logical=False), len(path_list), 16]))
logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
output = [pool.apply_async(f_proc, args=(file, new_file)) for file, new_file in zip(path_list, path_list_new)]
# pool = multiprocessing.Pool(min([psutil.cpu_count(logical=False), len(path_list), 16]))
# logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
# output = [pool.apply_async(f_proc, args=(file, new_file)) for file, new_file in zip(path_list, path_list_new)]
# for i, p in enumerate(output):
# logging.info(f"...finished: {p} ({int((i + 1.) / len(output) * 100)}%)")
# for file, new_file in zip(path_list, path_list_new):
# d = cut_data(file)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment