in labgraph/runners/local_runner.py [0:0]
def run(self) -> None:
"""
Starts the Labgraph module. Returns when the module has terminated.
"""
try:
if should_profile():
yappi.set_clock_type("cpu")
yappi.start()
self._running = True
logger.debug(f"{self._module}:started")
self._state = LocalRunnerState()
# Start the background thread (runs the event loop)
async_thread = _AsyncThread(runner=self)
async_thread.start()
logger.debug(f"{self._module}:asyncio thread starting")
# Start the monitor thread (monitors the graph for termination)
monitor_thread = _MonitorThread(runner=self)
monitor_thread.start()
logger.debug(f"{self._module}:monitor thread started")
# Run nodes' setup() functions
logger.debug(f"{self._module}:setup running")
self._run_setup()
self._state.setup_complete = True
logger.debug(f"{self._module}:setup done")
# Thread barrier: signal nodes' setup is done + wait for background thread
# to set up callbacks
self._state.setup_barrier.wait()
logger.debug(f"{self._module}:asyncio thread ready")
# Set up Cthulhu
logger.debug(f"{self._module}:cthulhu setting up")
self._setup_cthulhu()
logger.debug(f"{self._module}:cthulhu ready")
# Coordinate process readiness with other processes, if present
if self._options.bootstrap_info is not None:
# Signal that this process is ready
self._options.bootstrap_info.process_manager_state.update(
self._options.bootstrap_info.process_name, ProcessPhase.READY
)
logger.debug(f"{self._module}:waiting for other processes in graph")
self._wait_for_ready()
logger.debug(f"{self._module}:graph ready")
self._options.bootstrap_info.process_manager_state.update(
self._options.bootstrap_info.process_name, ProcessPhase.RUNNING
)
# Thread event: signal to background thread that Cthulhu and graph are
# set up
self._state.ready_event.set()
# Run @main method, if any
logger.debug(f"{self._module}:@main running")
self._run_main()
logger.debug(f"{self._module}:@main complete")
# Wait for the background thread to finish (this will happen shortly after
# producers stop producing, causing the task queue to clear up, or when an
# exception is raised)
logger.debug(f"{self._module}:waiting for async thread")
async_thread.join()
logger.debug(f"{self._module}:async thread complete")
# If the background thread raised an exception, re-raise it on the main
# thread
if self._exception is not None:
raise self._exception
except BaseException:
self._handle_exception()
finally:
self._running = False
if self._options.bootstrap_info is not None:
# Signal that this process is ready
self._options.bootstrap_info.process_manager_state.update(
self._options.bootstrap_info.process_name, ProcessPhase.STOPPING
)
if not self._state.cleanup_started and self._state.setup_complete:
# Run nodes' cleanup() functions
self._state.cleanup_started = True
logger.debug(f"{self._module}:running cleanup in main thread")
self._run_cleanup()
logger.debug(f"{self._module}:cleanup complete")
logger.debug(f"{self._module}:waiting for monitor thread")
monitor_thread.join()
logger.debug(f"{self._module}:monitor thread complete")
if should_profile():
yappi.stop()
write_profiling_results(self._module)
for stream_id, consumer in self._state.consumers.items():
performance_summary = consumer.get_performance_summary()
if performance_summary.num_calls > 0:
logger.info(
f"PERFORMANCE SUMMARY FOR {stream_id}:\n"
f"{format_performance_summary(performance_summary)}"
)
logger.debug(f"{self._module}:terminating")
if self._options.bootstrap_info is not None:
# Signal that this process is ready
self._options.bootstrap_info.process_manager_state.update(
self._options.bootstrap_info.process_name, ProcessPhase.TERMINATED
)
else:
# If there is an exception, raise it to the caller
if self._exception is not None:
raise self._exception