def run()

in labgraph/runners/local_runner.py [0:0]


    def run(self) -> None:
        """
        Starts the Labgraph module. Returns when the module has terminated.
        """
        try:
            if should_profile():
                yappi.set_clock_type("cpu")
                yappi.start()
            self._running = True
            logger.debug(f"{self._module}:started")
            self._state = LocalRunnerState()

            # Start the background thread (runs the event loop)
            async_thread = _AsyncThread(runner=self)
            async_thread.start()
            logger.debug(f"{self._module}:asyncio thread starting")

            # Start the monitor thread (monitors the graph for termination)
            monitor_thread = _MonitorThread(runner=self)
            monitor_thread.start()
            logger.debug(f"{self._module}:monitor thread started")

            # Run nodes' setup() functions
            logger.debug(f"{self._module}:setup running")
            self._run_setup()
            self._state.setup_complete = True
            logger.debug(f"{self._module}:setup done")

            # Thread barrier: signal nodes' setup is done + wait for background thread
            # to set up callbacks
            self._state.setup_barrier.wait()
            logger.debug(f"{self._module}:asyncio thread ready")

            # Set up Cthulhu
            logger.debug(f"{self._module}:cthulhu setting up")
            self._setup_cthulhu()
            logger.debug(f"{self._module}:cthulhu ready")

            # Coordinate process readiness with other processes, if present
            if self._options.bootstrap_info is not None:
                # Signal that this process is ready
                self._options.bootstrap_info.process_manager_state.update(
                    self._options.bootstrap_info.process_name, ProcessPhase.READY
                )
                logger.debug(f"{self._module}:waiting for other processes in graph")
                self._wait_for_ready()
                logger.debug(f"{self._module}:graph ready")
                self._options.bootstrap_info.process_manager_state.update(
                    self._options.bootstrap_info.process_name, ProcessPhase.RUNNING
                )

            # Thread event: signal to background thread that Cthulhu and graph are
            # set up
            self._state.ready_event.set()

            # Run @main method, if any
            logger.debug(f"{self._module}:@main running")
            self._run_main()
            logger.debug(f"{self._module}:@main complete")

            # Wait for the background thread to finish (this will happen shortly after
            # producers stop producing, causing the task queue to clear up, or when an
            # exception is raised)
            logger.debug(f"{self._module}:waiting for async thread")
            async_thread.join()
            logger.debug(f"{self._module}:async thread complete")

            # If the background thread raised an exception, re-raise it on the main
            # thread
            if self._exception is not None:
                raise self._exception
        except BaseException:
            self._handle_exception()
        finally:
            self._running = False
            if self._options.bootstrap_info is not None:
                # Signal that this process is ready
                self._options.bootstrap_info.process_manager_state.update(
                    self._options.bootstrap_info.process_name, ProcessPhase.STOPPING
                )
            if not self._state.cleanup_started and self._state.setup_complete:
                # Run nodes' cleanup() functions
                self._state.cleanup_started = True
                logger.debug(f"{self._module}:running cleanup in main thread")
                self._run_cleanup()
                logger.debug(f"{self._module}:cleanup complete")

            logger.debug(f"{self._module}:waiting for monitor thread")
            monitor_thread.join()
            logger.debug(f"{self._module}:monitor thread complete")

            if should_profile():
                yappi.stop()
                write_profiling_results(self._module)

                for stream_id, consumer in self._state.consumers.items():
                    performance_summary = consumer.get_performance_summary()
                    if performance_summary.num_calls > 0:
                        logger.info(
                            f"PERFORMANCE SUMMARY FOR {stream_id}:\n"
                            f"{format_performance_summary(performance_summary)}"
                        )

            logger.debug(f"{self._module}:terminating")

            if self._options.bootstrap_info is not None:
                # Signal that this process is ready
                self._options.bootstrap_info.process_manager_state.update(
                    self._options.bootstrap_info.process_name, ProcessPhase.TERMINATED
                )
            else:
                # If there is an exception, raise it to the caller
                if self._exception is not None:
                    raise self._exception