diff --git a/mlair/helpers/extract_from_ncfile.py b/mlair/helpers/extract_from_ncfile.py index 9cc18eccc0ae01a25eca3f85dde9a7f5ea162a0a..c24619a0e09d65dd24f2cfaaa7c913d9211c9be2 100644 --- a/mlair/helpers/extract_from_ncfile.py +++ b/mlair/helpers/extract_from_ncfile.py @@ -21,7 +21,7 @@ def get_files(path, start_time, end_time, search_pattern=None): return path_list -def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)): +def cut_data(file, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)): def south_north_cut(data): return data.where( da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True) @@ -46,7 +46,7 @@ def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)) return data.where( da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True) - # data = xr.open_dataset(file, chunks="auto") + data = xr.open_dataset(file)#, chunks="auto") time_vars = {d for d in data for i in data[d].dims if "XTIME" == i} south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i} @@ -98,7 +98,7 @@ def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)) return data_cut -def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)): +def cut_data_coords(file, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)): def south_north_cut(data): return data.where( da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True) @@ -123,7 +123,9 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=( return data.where( da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True) - # data = xr.open_dataset(file, chunks="auto") + data = xr.open_dataset(file, chunks="auto") + coords = data.coords + data = data.reset_coords() time_vars = {d for d in data for i in data[d].dims if "Time" == i} south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i} @@ -167,6 +169,8 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=( assert len(data) == len(data_cut) data_cut = data_cut.compute() + + data_cut = data_cut.set_coords(coords.keys()) try: data.close() except: @@ -175,16 +179,17 @@ def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=( def f_proc(file, new_file): - data = xr.open_dataset(file, chunks="auto") - if os.path.basename(file) == "coords.nc": - coords = data.coords - data = data.reset_coords() - d = cut_data_coords(data) - d = d.set_coords(coords.keys()) - else: - d = cut_data(data) + d = cut_data(file) + d.to_netcdf(new_file) + logging.info(f"created new_file: {new_file}") + return new_file + + +def f_proc_coords(file, new_file): + d = cut_data_coords(file) d.to_netcdf(new_file) - return 0 + logging.info(f"created new_file: {new_file}") + return True def run_apply_async_multiprocessing(func, argument_list, num_processes): @@ -207,22 +212,22 @@ if __name__ == "__main__": coords_file = glob.glob(os.path.join(os.path.split(path)[0], "coords.nc")) coords_file_new = [os.path.join(new_path, os.path.basename(p)) for p in coords_file] - f_proc(coords_file[0], coords_file_new[0]) - + c = f_proc_coords(coords_file[0], coords_file_new[0]) path_list = get_files(path, start_time, end_time) path_list_new = [os.path.join(new_path, os.path.basename(p)) for p in path_list] print(f"found {len(path_list)} files") num_processes = min([psutil.cpu_count(logical=False), len(path_list), 16]) - # result_list = run_apply_async_multiprocessing(func=f_proc, argument_list=(path_list, path_list_new), num_processes=num_processes) + result_list = run_apply_async_multiprocessing(func=f_proc, argument_list=(path_list, path_list_new), num_processes=7) # progress_bar = tqdm.tqdm(total=len(path_list)) # - pool = multiprocessing.Pool(min([psutil.cpu_count(logical=False), len(path_list), 16])) - logging.info(f"running {getattr(pool, '_processes')} processes in parallel") - output = [pool.apply_async(f_proc, args=(file, new_file)) for file, new_file in zip(path_list, path_list_new)] - + # pool = multiprocessing.Pool(min([psutil.cpu_count(logical=False), len(path_list), 16])) + # logging.info(f"running {getattr(pool, '_processes')} processes in parallel") + # output = [pool.apply_async(f_proc, args=(file, new_file)) for file, new_file in zip(path_list, path_list_new)] + # for i, p in enumerate(output): + # logging.info(f"...finished: {p} ({int((i + 1.) / len(output) * 100)}%)") # for file, new_file in zip(path_list, path_list_new): # d = cut_data(file)