Commit d7110d18 authored by mova's avatar mova
Browse files

move queueflow to external package

parent 9798dac2
......@@ -7,14 +7,13 @@ from pathlib import Path
import h5py as h5
import numpy as np
import queueflow as qf
import torch
import yaml
from torch.multiprocessing import Queue
from fgsim.config import conf
from . import qf
# Load files
ds_path = Path(conf.path.dataset)
assert ds_path.is_dir()
......
......@@ -8,6 +8,7 @@ from pathlib import Path
import h5py as h5
import numpy as np
import queueflow as qf
import torch_geometric
import yaml
from torch.multiprocessing import Queue
......@@ -16,8 +17,6 @@ from fgsim.config import conf
from fgsim.geo.batch_stack import split_layer_subgraphs
from fgsim.geo.transform import transform
from . import qf
# Load files
ds_path = Path(conf.path.dataset)
assert ds_path.is_dir()
......
......@@ -8,6 +8,7 @@ from pathlib import Path
from typing import List, Tuple
import awkward as ak
import queueflow as qf
import torch_geometric
import uproot
import yaml
......@@ -16,7 +17,6 @@ from torch_geometric.data import Data as GraphType
from fgsim.config import conf
from fgsim.geo.detid_to_graph import event_to_graph
from fgsim.io import qf
# Load files
ds_path = Path(conf.path.dataset)
......
......@@ -10,6 +10,7 @@ from typing import Dict, List, Tuple
import awkward as ak
import numpy as np
import queueflow as qf
import torch
import uproot
import yaml
......@@ -17,7 +18,6 @@ from torch.multiprocessing import Queue
from fgsim.config import conf
from fgsim.geo.geo_lup import geo_lup
from fgsim.io import qf
# Load files
ds_path = Path(conf.path.dataset)
......
......@@ -6,12 +6,12 @@ should be passed the qfseq.
from pathlib import Path
from typing import List
import queueflow as qf
import torch
from torch.multiprocessing import Queue
from torch_geometric.data import Data as GraphType
from fgsim.config import conf
from fgsim.io import qf
# Load files
dataset_path = Path(conf.path.training)
......
from tblib import pickling_support
from torch import multiprocessing as mp
from .pack import PackStep, RepackStep, UnpackStep
from .pool import PoolStep
from .process_step import ProcessStep
from .sequence import Sequence
pickling_support.install()
# Two recommendations by
# https://github.com/pytorch/pytorch/issues/973
# 1. (not needed for the moment)
# import resource
# rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
# resource.setrlimit(resource.RLIMIT_NOFILE, (2000 , rlimit[1]))
# 2.
# Without the following option it crashes with
# File ".../multiprocessing/reduction.py", line 164, in recvfds
# raise RuntimeError('received %d items of ancdata' %
# RuntimeError: received 0 items of ancdata
# Make it work ()
# mp.set_sharing_strategy("file_descriptor")
mp.set_sharing_strategy("file_system")
# Reworked according to the recommendations in
# https://pytorch.org/docs/stable/multiprocessing.html
# It works event though multiprocessing with these input is not
# torch.multiprocessing but just the standard multiprocessing.
__all__ = ["pack", "process_step", "sequence", "pool"]
# Usage example
# This only run as a standalone file because the function given to pool must be pickleable,
# and if this is called from another file, the defined function has no connection
# to the top level module and therefore cannot be pickled.s
# https://stackoverflow.com/questions/8804830/python-multiprocessing-picklingerror-cant-pickle-type-function
# def sleep_times_two(inp):
# name = multiprocessing.current_process().name
# print(f"!!!sleep_times_two {name} got input {inp} start sleep")
# # time.sleep(1)
# print(f"!!!sleep_times_two {name} finish sleep")
# return inp * 2
# def minus_one(inp):
# return inp - 1
# def printqueue(inp):
# print_with_lock(inp)
# return inp
# process_seq = Sequence(
# Pack_Step(8),
# Process_Step(printqueue, 1, name="printfunction1"),
# Pool_Step(sleep_times_two, nworkers=5, name="sleep_times_two"),
# Process_Step(printqueue, 1, name="printfunction2"),
# Unpack_Step(),
# Process_Step(minus_one, nworkers=5, name="minus_one"),
# )
# res = process_seq(np.random.randint(0, 50, 19))
# oldflowstatus = ""
# for i in range(60):
# newflowstatus = res.flowstatus()
# if newflowstatus != oldflowstatus:
# print_with_lock(newflowstatus)
# oldflowstatus = newflowstatus
# else:
# print_with_lock("+", end="")
# time.sleep(00.1)
# print("foo")
# for i, e in enumerate(res):
# print_with_lock(f"({i})Final Output {e}")
# print_with_lock(res.flowstatus())
# print_with_lock("Done Iterating")
from multiprocessing.queues import Empty, Full
import torch_geometric
from fgsim.utils.batch_utils import clone_batch
from fgsim.utils.logger import logger
from .terminate_queue import TerminateQueue
class InOutStep:
def __init__(self):
self.shutdown_event = NotImplemented
def safe_put(self, queue, element):
while not self.shutdown_event.is_set():
try:
queue.put(element, True, 1)
break
except Full:
continue
class InputStep(InOutStep):
"""Internal class to read in the iterable into a the first queue"""
def __init__(self):
self.name = "input step"
def queue_iterable(self, iterable_object):
assert hasattr(iterable_object, "__iter__")
i = 0
for element in iterable_object:
self.safe_put(self.outq, element)
i = i + 1
logger.debug(f"Queuing {i} elements complete")
self.safe_put(self.outq, TerminateQueue())
def connect_to_sequence(self, output_queue, shutdown_event):
self.outq = output_queue
self.shutdown_event = shutdown_event
class OutputStep(InOutStep):
"""Internal generator class to returning the outputs from the last queue."""
def __init__(self):
self.name = "output step"
def start(self):
pass
def __iter__(self):
return self
def __next__(self):
while not self.shutdown_event.is_set():
try:
out = self.inq.get(block=True, timeout=0.05)
if isinstance(out, TerminateQueue):
logger.debug("OutputStep got terminal element.")
break
if isinstance(out, torch_geometric.data.Data):
out = clone_batch(out)
return out
except Empty:
continue
logger.debug("Sequence output ready.")
raise StopIteration
def connect_to_sequence(self, input_queue, shutdown_event):
self.inq = input_queue
self.shutdown_event = shutdown_event
from collections.abc import Iterable
from multiprocessing.queues import Empty
from fgsim.utils.logger import logger
from .step_base import StepBase
from .terminate_queue import TerminateQueue
class UnpackStep(StepBase):
"""A single process takes an iterable from the incoming queue and
puts the elements one-by-one in the outgoing queue."""
def __init__(self):
super().__init__(name="Unpack")
def __handle_terminal(self):
logger.debug(
f"""\
{self.workername} push terminal element into output queue {id(self.outq)}."""
)
self.safe_put(self.outq, TerminateQueue())
def _worker(self):
self.set_workername()
logger.info(f"{self.workername} start working")
while True:
if self.shutdown_event.is_set():
break
try:
wkin = self.inq.get(block=True, timeout=0.05)
wkin = self._clone_tensors(wkin)
except Empty:
continue
logger.debug(
f"""\
{self.workername} working type {type(wkin)} from queue {id(self.inq)}."""
)
if isinstance(wkin, TerminateQueue):
self.__handle_terminal()
continue
if not isinstance(wkin, Iterable):
errormsg = f"""\
{self.workername} cannot iterate over element type {type(wkin)}."""
self.error_queue.put((errormsg, wkin, ValueError))
break
logger.debug(
f"{self.workername} got element of element type {type(wkin)}."
)
for element in wkin:
logger.debug(
f"""\
{self.workername} push element of type {type(wkin)} into output queue."""
)
self.safe_put(self.outq, element)
del wkin
self._close_queues()
logger.info(f"{self.workername} terminating")
class PackStep(StepBase):
"""Takes an iterable from the incoming queue and
puts the elements one-by-one in the outgoing queue."""
def __init__(
self,
nelements,
):
super().__init__(name=f"Pack({nelements})")
self.nelements = nelements
self.collected_elements = []
def __handle_terminal(self):
if len(self.collected_elements) > 0:
logger.debug(
f"""\
{self.workername} put remainder of size {len(self.collected_elements)} into output queue."""
)
self.safe_put(self.outq, self.collected_elements)
logger.debug(
f"""\
{self.workername} terminal element into output queue {id(self.outq)}."""
)
self.safe_put(self.outq, TerminateQueue())
def _worker(self):
self.set_workername()
logger.info(f"{self.workername} start working")
while True:
if self.shutdown_event.is_set():
break
try:
wkin = self.inq.get(block=True, timeout=0.05)
wkin = self._clone_tensors(wkin)
except Empty:
continue
logger.debug(
f"""\
{self.workername} working on type {type(wkin)} from queue {id(self.inq)}."""
)
if isinstance(wkin, TerminateQueue):
self.__handle_terminal()
continue
logger.debug(
f"""\
{self.workername} storing element of type {type(wkin)}."""
)
self.collected_elements.append(wkin)
if len(self.collected_elements) == self.nelements:
logger.debug(
f"""\
{self.workername} push list of type \
{type(self.collected_elements[-1])} into output queue {id(self.outq)}."""
)
self.safe_put(self.outq, self.collected_elements)
self.collected_elements = []
del wkin
self._close_queues()
class RepackStep(StepBase):
"""Takes an iterable from the incoming queue,
collects n elements and packs them as a list in the outgoing queue."""
def __init__(self, nelements):
super().__init__(name=f"Repack({nelements})")
self.nelements = nelements
self.collected_elements = []
def __handle_terminal(self):
if len(self.collected_elements) > 0:
logger.debug(
f"""\
{self.workername} put remainder of size {len(self.collected_elements)} into output queue."""
)
self.safe_put(self.outq, self.collected_elements)
logger.debug(
f"""\
{self.workername} terminal element into output queue {id(self.outq)}."""
)
self.safe_put(self.outq, TerminateQueue())
logger.warning(
f"""\
{self.workername} finished with iterable (in {self.count_in}/out {self.count_out})"""
)
self.count_in, self.count_out = 0, 0
def _worker(self):
self.set_workername()
logger.info(f"{self.workername} start working")
while True:
if self.shutdown_event.is_set():
break
try:
wkin = self.inq.get(block=True, timeout=0.05)
except Empty:
continue
logger.debug(
f"""
{self.workername} working on type {type(wkin)} from queue {id(self.inq)}."""
)
if isinstance(wkin, TerminateQueue):
self.__handle_terminal()
continue
if not isinstance(wkin, Iterable):
errormsg = f"""\
{self.workername} cannot iterate over element type {type(wkin)}."""
self.error_queue.put((errormsg, wkin, ValueError))
break
self.count_in += 1
logger.debug(
f"""\
{self.workername} storing element of type {type(wkin)} \
(len {len(wkin) if hasattr(wkin,'__len__') else '?'})."""
)
for element in wkin:
self.collected_elements.append(element)
if len(self.collected_elements) == self.nelements:
logger.debug(
f"""\
{self.workername} push list of type {type(self.collected_elements[-1])} \
with {self.nelements} elements into output queue {id(self.outq)}."""
)
self.safe_put(self.outq, self.collected_elements)
self.collected_elements = []
self.count_out += 1
del wkin
self._close_queues()
from collections.abc import Iterable
from multiprocessing.queues import Empty
from torch import multiprocessing as mp
from fgsim.utils.count_iterations import CountIterations
from fgsim.utils.logger import logger
from .step_base import StepBase
from .terminate_queue import TerminateQueue
class PoolStep(StepBase):
"""Class for simple processing steps pooled over multiple workes.
Each incoming object is processed by a multiple subprocesses
per worker into a single outgoing element."""
def __init__(
self,
*args,
nworkers: int,
**kwargs,
):
# Spawn only one process with deamonize false that can spawn the Pool
kwargs["deamonize"] = False
# Make sure the contructor of the base class only initializes
# one process that manages the pool
self.n_pool_workers = nworkers
kwargs["nworkers"] = 1
super().__init__(*args, **kwargs)
def start(self):
for p in self.processes:
p.daemon = self.deamonize
p.start()
def stop(self):
for p in self.processes:
if p.is_alive():
p.join(5)
p.kill()
def process_status(self):
return (
sum([p.is_alive() for p in self.processes]) * self.n_pool_workers,
self.n_pool_workers,
)
def _worker(self):
self.set_workername()
logger.debug(
f"{self.workername} pool initalizing with"
f" {self.n_pool_workers} subprocesses"
)
self.pool = mp.Pool(self.n_pool_workers)
while not self.shutdown_event.is_set():
try:
wkin = self.inq.get(block=True, timeout=0.05)
except Empty:
continue
logger.debug(
f"""\
{self.workername} working on element of type {type(wkin)} from queue {id(self.inq)}."""
)
# If the process gets a TerminateQueue object,
# it terminates the pool and and puts the terminal element in
# in the outgoing queue.
if isinstance(wkin, TerminateQueue):
logger.info(f"{self.workername} terminating")
self.safe_put(self.outq, TerminateQueue())
logger.warning(
f"""\
{self.workername} finished with iterable (in {self.count_in}/out {self.count_out})"""
)
self.count_in, self.count_out = 0, 0
continue
self.count_in += 1
wkin = self._clone_tensors(wkin)
assert isinstance(wkin, Iterable)
logger.debug(
f"{self.workername} got element"
+ f" {id(wkin)} of element type {type(wkin)}."
)
wkin_iter = CountIterations(wkin)
try:
wkout_async_res = self.pool.map_async(
self.workerfn,
wkin_iter,
)
while True:
if wkout_async_res.ready():
wkout = wkout_async_res.get()
break
elif self.shutdown_event.is_set():
break
wkout_async_res.wait(1)
if self.shutdown_event.is_set():
break
except Exception as error:
logger.warning(f"""{self.workername} got error""")
self.handle_error(error, wkin)
break
# if wkin_iter.count > 200:
# logger.warning(
# f"""\
# Giving large iterables ({wkin_iter.count})\
# to a worker can lead to crashes.
# Lower the number here if you see an error like \
# 'RuntimeError: unable to mmap x bytes from file </torch_x>:
# Cannot allocate memory'"""
# )
logger.debug(
f"""\
{self.workername} push pool output list {id(wkout)} with \
element type {type(wkin)} into output queue {id(self.outq)}."""
)
# Put while there is no shutdown event
self.safe_put(self.outq, wkout)
self.count_out += 1
del wkin_iter
del wkin
self.pool.close()
self.pool.terminate()
logger.debug(f"""{self.workername} pool closed""")
self.outq.cancel_join_thread()
self._close_queues()
logger.debug(f"""{self.workername} queues closed""")
from multiprocessing.queues import Empty
from torch import multiprocessing as mp
from fgsim.utils.logger import logger
from .step_base import StepBase
from .terminate_queue import TerminateQueue
class ProcessStep(StepBase):
"""Class for simple processing steps.
Each incoming object is processed by a
single worker into a single outgoing element."""
def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.finish_barrier = mp.Barrier(parties=self.nworkers)
self.sync_barrier = mp.Barrier(parties=self.nworkers)
def __handle_terminal(self):
logger.debug(f"{self.workername} Got terminal element.")
# Put the terminal element back in the input queue
self.safe_put(self.inq, TerminateQueue())
# Make the first worker to reach the terminal element
# aquires the lock and waits for the other processes
# processes to finish
logger.debug(f"{self.workername} waiting at barrier.")
self.sync_barrier.reset()
if self.finish_barrier.wait() == 0:
assert isinstance(self.inq.get(), TerminateQueue)
self.safe_put(self.outq, TerminateQueue())
logger.debug(f"{self.workername} put terminal element in outq.")
if self.sync_barrier.wait() == 0:
self.finish_barrier.reset()
logger.debug(
f"""\
{self.workername} finished with iterable (in {self.count_in}/out {self.count_out})"""