Skip to content
Snippets Groups Projects
Commit 2c9f7603 authored by Felix Kleinert's avatar Felix Kleinert
Browse files

first draft of nc extractor

parent a165f726
No related branches found
No related tags found
1 merge request!259Draft: Resolve "WRF-Datahandler should inherit from SingleStationDatahandler"
Pipeline #65143 passed
import xarray as xr
import glob
import os
import pandas as pd
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar
import dask
import multiprocessing
import psutil
import logging
import tqdm
def get_files(path, start_time, end_time, search_pattern=None):
if search_pattern is None:
search_pattern = ""
path_list = []
for day in pd.date_range(start_time, end_time):
path_list += sorted(glob.glob(os.path.join(path, f"{search_pattern}*{day.strftime('%Y-%m-%d')}*.nc")))
return path_list
def cut_data(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def south_north_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True)
def west_east_cut(data):
return data.where(
da.logical_and(we_icoord[0] <= data.west_east, data.west_east <= we_icoord[1]), drop=True)
def bottom_top_cut(data):
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top, data.bottom_top <= bt_icood[1]), drop=True)
def south_north_stag_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north_stag, data.south_north_stag <= sn_icoord[1] + 1), drop=True)
def west_east_stag_cut(data):
return data.where(
da.logical_and(we_icoord[0] <= data.west_east_stag, data.west_east_stag <= we_icoord[1] + 1), drop=True)
def bottom_top_stag_cut(data):
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True)
# data = xr.open_dataset(file, chunks="auto")
time_vars = {d for d in data for i in data[d].dims if "XTIME" == i}
south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i}
west_east_vars = {d for d in data for i in data[d].dims if "west_east" == i}
bottom_top_vars = {d for d in data for i in data[d].dims if "bottom_top" == i}
south_north_stag_vars = {d for d in data for i in data[d].dims if "south_north_stag" == i}
west_east_stag_vars = {d for d in data for i in data[d].dims if "west_east_stag" == i}
bottom_top_stag_vars = {d for d in data for i in data[d].dims if "bottom_top_stag" == i}
center_vars3D = south_north_vars & west_east_vars & bottom_top_vars
center_vars2D = (south_north_vars & west_east_vars) - bottom_top_vars - bottom_top_stag_vars
center_vars1D_vert = bottom_top_vars - south_north_vars - west_east_vars
scalars = time_vars - center_vars3D- center_vars2D - center_vars1D_vert - south_north_stag_vars -\
west_east_stag_vars -bottom_top_stag_vars
center_data = data[list(center_vars3D) + list(center_vars2D)+list(center_vars1D_vert)]
center_data = south_north_cut(center_data)
center_data = west_east_cut(center_data)
center_data2D = center_data[list(center_vars2D)].copy()
center_data1_3D = bottom_top_cut(center_data[list(center_vars3D) + list(center_vars1D_vert)])
scalar_data = data[list(scalars)].copy()
sn_stag_data = data[list(south_north_stag_vars)]
sn_stag_data = south_north_stag_cut(sn_stag_data)
sn_stag_data = west_east_cut(sn_stag_data)
sn_stag_data = bottom_top_cut(sn_stag_data)
we_stag_data = data[list(west_east_stag_vars)]
we_stag_data = south_north_cut(we_stag_data)
we_stag_data = west_east_stag_cut(we_stag_data)
we_stag_data = bottom_top_cut(we_stag_data)
bt_stag_data = data[list(bottom_top_stag_vars)]
bt_stag_data = south_north_cut(bt_stag_data)
bt_stag_data = west_east_cut(bt_stag_data)
bt_stag_data = bottom_top_stag_cut(bt_stag_data)
data_cut = center_data1_3D.update(center_data2D).update(scalar_data).update(sn_stag_data).update(we_stag_data).update(bt_stag_data)
assert len(data) == len(data_cut)
data_cut = data_cut.compute()
try:
data.close()
except:
pass
return data_cut
def cut_data_coords(data, sn_icoord=(130, 210), we_icoord=(160, 220), bt_icood=(0, 10)):
def south_north_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north, data.south_north <= sn_icoord[1]), drop=True)
def west_east_cut(data):
return data.where(
da.logical_and(we_icoord[0] <= data.west_east, data.west_east <= we_icoord[1]), drop=True)
def bottom_top_cut(data):
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top, data.bottom_top <= bt_icood[1]), drop=True)
def south_north_stag_cut(data):
return data.where(
da.logical_and(sn_icoord[0] <= data.south_north_stag, data.south_north_stag <= sn_icoord[1] + 1), drop=True)
def west_east_stag_cut(data):
return data.where(
da.logical_and(we_icoord[0] <= data.west_east_stag, data.west_east_stag <= we_icoord[1] + 1), drop=True)
def bottom_top_stag_cut(data):
return data.where(
da.logical_and(bt_icood[0] <= data.bottom_top_stag, data.bottom_top_stag <= bt_icood[1] + 1), drop=True)
# data = xr.open_dataset(file, chunks="auto")
time_vars = {d for d in data for i in data[d].dims if "Time" == i}
south_north_vars = {d for d in data for i in data[d].dims if "south_north" == i}
west_east_vars = {d for d in data for i in data[d].dims if "west_east" == i}
bottom_top_vars = {d for d in data for i in data[d].dims if "bottom_top" == i}
south_north_stag_vars = {d for d in data for i in data[d].dims if "south_north_stag" == i}
west_east_stag_vars = {d for d in data for i in data[d].dims if "west_east_stag" == i}
bottom_top_stag_vars = {d for d in data for i in data[d].dims if "bottom_top_stag" == i}
center_vars3D = south_north_vars & west_east_vars & bottom_top_vars
center_vars2D = (south_north_vars & west_east_vars) - bottom_top_vars - bottom_top_stag_vars
center_vars1D_vert = bottom_top_vars - south_north_vars - west_east_vars
scalars = time_vars - center_vars3D- center_vars2D - center_vars1D_vert - south_north_stag_vars -\
west_east_stag_vars -bottom_top_stag_vars
center_data = data[list(center_vars3D) + list(center_vars2D)+list(center_vars1D_vert)]
center_data = south_north_cut(center_data)
center_data = west_east_cut(center_data)
center_data2D = center_data[list(center_vars2D)].copy()
center_data1_3D = bottom_top_cut(center_data[list(center_vars3D) + list(center_vars1D_vert)])
# scalar_data = data[list(scalars)].copy()
sn_stag_data = data[list(south_north_stag_vars)]
sn_stag_data = south_north_stag_cut(sn_stag_data)
sn_stag_data = west_east_cut(sn_stag_data)
# sn_stag_data = bottom_top_cut(sn_stag_data)
we_stag_data = data[list(west_east_stag_vars)]
we_stag_data = south_north_cut(we_stag_data)
we_stag_data = west_east_stag_cut(we_stag_data)
# we_stag_data = bottom_top_cut(we_stag_data)
bt_stag_data = data[list(bottom_top_stag_vars)]
# bt_stag_data = south_north_cut(bt_stag_data)
# bt_stag_data = west_east_cut(bt_stag_data)
bt_stag_data = bottom_top_stag_cut(bt_stag_data)
data_cut = center_data1_3D.update(center_data2D).update(sn_stag_data).update(we_stag_data).update(bt_stag_data)
assert len(data) == len(data_cut)
data_cut = data_cut.compute()
try:
data.close()
except:
pass
return data_cut
def f_proc(file, new_file):
data = xr.open_dataset(file, chunks="auto")
if os.path.basename(file) == "coords.nc":
coords = data.coords
data = data.reset_coords()
d = cut_data_coords(data)
d = d.set_coords(coords.keys())
else:
d = cut_data(data)
d.to_netcdf(new_file)
return 0
def run_apply_async_multiprocessing(func, argument_list, num_processes):
pool = multiprocessing.Pool(num_processes)
jobs = [pool.apply_async(func=func, args=(file, new_file)) for file, new_file in zip(*argument_list)]
pool.close()
result_list_tqdm = []
for job in tqdm.tqdm(jobs):
result_list_tqdm.append(job.get())
return result_list_tqdm
if __name__ == "__main__":
path = "/home/felix/Data/WRF-Chem/upload_aura_2021-02-24/2009"
new_path = "/home/felix/Data/WRF-Chem/test_cut_nc/"
start_time = "2009-01-01"
end_time = "2009-01-04"
coords_file = glob.glob(os.path.join(os.path.split(path)[0], "coords.nc"))
coords_file_new = [os.path.join(new_path, os.path.basename(p)) for p in coords_file]
f_proc(coords_file[0], coords_file_new[0])
path_list = get_files(path, start_time, end_time)
path_list_new = [os.path.join(new_path, os.path.basename(p)) for p in path_list]
print(f"found {len(path_list)} files")
num_processes = min([psutil.cpu_count(logical=False), len(path_list), 16])
# result_list = run_apply_async_multiprocessing(func=f_proc, argument_list=(path_list, path_list_new), num_processes=num_processes)
# progress_bar = tqdm.tqdm(total=len(path_list))
#
pool = multiprocessing.Pool(min([psutil.cpu_count(logical=False), len(path_list), 16]))
logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
output = [pool.apply_async(f_proc, args=(file, new_file)) for file, new_file in zip(path_list, path_list_new)]
# for file, new_file in zip(path_list, path_list_new):
# d = cut_data(file)
# d.to_netcdf(new_file)
print()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment