in labgraph/runners/local_runner.py [0:0]
def run(self) -> None:
"""
Runs the `_AsyncThread`'s event loop.
"""
logger.debug(f"{self.module}:started background thread")
# We import asyncio here and keep asyncio object construction within this method
# in order to ensure that no asyncio state leaks into the global state,
# guaranteeing it will never be pickled. We want this because subtle bugs can
# arise when asyncio objects are transported across process boundaries. Many
# of the objects in this method are kept as locals rather than being set as
# properties in order to avoid accidentally accessing them from the main thread,
# e.g., in the `LocalRunner`.
import asyncio
# Create event loop
loop = asyncio.new_event_loop()
loop.set_exception_handler(self.handle_exception)
asyncio.set_event_loop(loop)
try:
# Create callback methods that run in the event loop
with self.state.lock:
for stream in self.module.__streams__.values():
callbacks = []
for subscriber_path, subscriber in self.module.subscribers.items():
if subscriber.subscribed_topic_path in stream.topic_paths:
if isinstance(subscriber, Transformer):
callbacks.append(
self.wrap_transformer_callback(
transformer_path=subscriber_path, loop=loop
)
)
else:
callbacks.append(
self.wrap_subscriber_callback(
subscriber_path=subscriber_path, loop=loop
)
)
stream_callback = self.wrap_all_callbacks(callbacks, loop=loop)
if self.options.aligner is not None:
# Inject aligner into callback if present
self.options.aligner.register(stream.id, stream_callback)
stream_callback = self.options.aligner.push
self.state.callbacks[stream.id] = stream_callback
# Thread barrier: wait for nodes' setup + signal to main thread that
# callbacks are ready
self.state.setup_barrier.wait()
# Thread event: wait for main thread to set up Cthulhu
self.state.ready_event.wait()
# Schedule startup coroutines in event loop
for awaitable in self.get_startup_methods():
asyncio.ensure_future(awaitable, loop=loop)
# Schedule aligner task
if self.options.aligner is not None:
logger.debug(f"{self.module}:background thread:run aligner")
loop.create_task(self.options.aligner.run())
logger.debug(f"{self.module}:background thread:run event loop")
with contextlib.ExitStack() as run_stack:
if "PROFILE" in os.environ:
# Run yappi profiling
run_stack.enter_context(yappi.run())
# Run event loop
while self.runner._running:
loop.run_until_complete(asyncio.sleep(0.01))
except BaseException:
logger.debug(f"{self.module}:handling exception in background thread")
self.runner._handle_exception()
if not self.state.cleanup_started and self.state.setup_complete:
# The main thread may not be able to run cleanup if it is blocked by a
# @main function. So we try to run cleanup in the background thread
# first.
self.state.cleanup_started = True
logger.debug(f"{self.module}:running cleanup in background thread")
self.runner._run_cleanup()
logger.debug(f"{self.module}:cleanup complete")
# Terminate the aligner
if self.options.aligner is not None:
logger.debug(f"{self.module}:background thread:terminate aligner")
self.options.aligner.wait_for_completion()
logger.debug(f"{self.module}:background thread:shutting down async gens")
# https://bugs.python.org/issue38559
for task in asyncio.Task.all_tasks(loop=loop):
task.cancel()
loop.run_until_complete(loop.shutdown_asyncgens())
logger.debug(f"{self.module}:background thread:waiting for pending tasks")
pending_start_time = time.perf_counter()
while True:
time.sleep(ASYNCIO_SHUTDOWN_POLL_TIME)
pending = [
task for task in asyncio.Task.all_tasks(loop=loop) if not task.done()
]
if len(pending) == 0:
logger.debug(f"{self.module}:background thread:closing event loop")
loop.close()
return
elif time.perf_counter() - pending_start_time >= ASYNCIO_SHUTDOWN_TIME:
logger.warning(
f"{self.module}:background thread:closing event loop with "
f"{len(pending)} tasks left"
)
# Suppress exception handling - otherwise the handler catches "Task
# was destroyed but it is pending!"
loop.set_exception_handler(lambda _l, _c: None)
try:
loop.close()
except Exception:
# Exception expected with pending tasks
pass
return
logger.debug(f"{self.module}:{len(pending)} tasks left")
loop.run_until_complete(asyncio.sleep(1))