Commit 114a30fe authored by mova's avatar mova
Browse files

make the preprocessing sequence stop when done

parent 963d6fcc
......@@ -67,7 +67,7 @@ loader_options:
validation_set_size: 5000
test_set_size: 30000
prefetch_batches: 2
num_workers_transform: 30
num_workers_transform: 20
num_workers_stack: 2
hgcal:
qf_seq_name: "hgcal_seq"
......
......@@ -85,23 +85,13 @@ def magic_do_nothing(batch: GraphType) -> GraphType:
def process_seq():
return (
qf.ProcessStep(read_chunk, 2, name="read_chunk"),
# Queue(1),
# # In the input is now [(x,y), ... (x [300 * 51 * 51 * 25], y [300,1] ), (x,y)]
# # For these elements to be processed by each of the workers in the following
# # transformthey need to be (x [51 * 51 * 25], y [1] ):
qf.PoolStep(
event_to_graph,
nworkers=conf.loader.num_workers_transform,
name="transform",
),
# Queue(1),
# qf.RepackStep(conf.loader.batch_size),
qf.RepackStep(conf.loader.batch_size),
qf.ProcessStep(geo_batch, 1, name="geo_batch"),
# qf.ProcessStep(
# split_layer_subgraphs,
# conf.loader.num_workers_stack,
# name="split_layer_subgraphs",
# ),
qf.ProcessStep(add_sparse_adj_mtx, 1, name="add_sparse_adj_mtx"),
# Needed for outputs to stay in order.
qf.ProcessStep(
......
......@@ -23,8 +23,8 @@ pickling_support.install()
# Make it work ()
# mp.set_sharing_strategy("file_descriptor")
mp.set_sharing_strategy("file_system")
mp.set_sharing_strategy("file_descriptor")
# mp.set_sharing_strategy("file_system")
# Reworked according to the recommendations in
# https://pytorch.org/docs/stable/multiprocessing.html
......
......@@ -38,9 +38,15 @@ Processing testing batches, queuing {len(data_loader.testing_chunks)} chunks."""
batch_list: List[GraphType] = []
ifile = 0
for batch in tqdm(data_loader.qfseq):
output_file = f"{conf.path.training}/{ifile:03d}.pt"
if len(batch_list) == data_loader.n_test_batches:
torch.save(testing_batches, f"{conf.path.training}/{ifile}.pt")
logger.info(f"Saving {output_file}")
torch.save(batch_list, f"{output_file}")
ifile += 1
batch_list = []
batch_list.append(batch)
torch.save(testing_batches, f"{conf.path.training}/{ifile}.pt")
logger.info(f"Saving {output_file}")
torch.save(batch_list, f"{output_file}")
data_loader.qfseq.stop()
exit(0)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment