def run()

in labgraph/runners/local_runner.py [0:0]


    def run(self) -> None:
        """
        Runs the `_AsyncThread`'s event loop.
        """

        logger.debug(f"{self.module}:started background thread")

        # We import asyncio here and keep asyncio object construction within this method
        # in order to ensure that no asyncio state leaks into the global state,
        # guaranteeing it will never be pickled. We want this because subtle bugs can
        # arise when asyncio objects are transported across process boundaries. Many
        # of the objects in this method are kept as locals rather than being set as
        # properties in order to avoid accidentally accessing them from the main thread,
        # e.g., in the `LocalRunner`.

        import asyncio

        # Create event loop
        loop = asyncio.new_event_loop()
        loop.set_exception_handler(self.handle_exception)
        asyncio.set_event_loop(loop)

        try:
            # Create callback methods that run in the event loop
            with self.state.lock:
                for stream in self.module.__streams__.values():
                    callbacks = []
                    for subscriber_path, subscriber in self.module.subscribers.items():
                        if subscriber.subscribed_topic_path in stream.topic_paths:
                            if isinstance(subscriber, Transformer):
                                callbacks.append(
                                    self.wrap_transformer_callback(
                                        transformer_path=subscriber_path, loop=loop
                                    )
                                )
                            else:
                                callbacks.append(
                                    self.wrap_subscriber_callback(
                                        subscriber_path=subscriber_path, loop=loop
                                    )
                                )

                    stream_callback = self.wrap_all_callbacks(callbacks, loop=loop)

                    if self.options.aligner is not None:
                        # Inject aligner into callback if present
                        self.options.aligner.register(stream.id, stream_callback)
                        stream_callback = self.options.aligner.push

                    self.state.callbacks[stream.id] = stream_callback

            # Thread barrier: wait for nodes' setup + signal to main thread that
            # callbacks are ready
            self.state.setup_barrier.wait()

            # Thread event: wait for main thread to set up Cthulhu
            self.state.ready_event.wait()

            # Schedule startup coroutines in event loop
            for awaitable in self.get_startup_methods():
                asyncio.ensure_future(awaitable, loop=loop)

            # Schedule aligner task
            if self.options.aligner is not None:
                logger.debug(f"{self.module}:background thread:run aligner")
                loop.create_task(self.options.aligner.run())

            logger.debug(f"{self.module}:background thread:run event loop")

            with contextlib.ExitStack() as run_stack:
                if "PROFILE" in os.environ:
                    # Run yappi profiling
                    run_stack.enter_context(yappi.run())

                # Run event loop
                while self.runner._running:
                    loop.run_until_complete(asyncio.sleep(0.01))
        except BaseException:
            logger.debug(f"{self.module}:handling exception in background thread")
            self.runner._handle_exception()

        if not self.state.cleanup_started and self.state.setup_complete:
            # The main thread may not be able to run cleanup if it is blocked by a
            # @main function. So we try to run cleanup in the background thread
            # first.
            self.state.cleanup_started = True
            logger.debug(f"{self.module}:running cleanup in background thread")
            self.runner._run_cleanup()
            logger.debug(f"{self.module}:cleanup complete")

        # Terminate the aligner
        if self.options.aligner is not None:
            logger.debug(f"{self.module}:background thread:terminate aligner")
            self.options.aligner.wait_for_completion()
        logger.debug(f"{self.module}:background thread:shutting down async gens")

        # https://bugs.python.org/issue38559
        for task in asyncio.Task.all_tasks(loop=loop):
            task.cancel()
        loop.run_until_complete(loop.shutdown_asyncgens())

        logger.debug(f"{self.module}:background thread:waiting for pending tasks")
        pending_start_time = time.perf_counter()
        while True:
            time.sleep(ASYNCIO_SHUTDOWN_POLL_TIME)
            pending = [
                task for task in asyncio.Task.all_tasks(loop=loop) if not task.done()
            ]
            if len(pending) == 0:
                logger.debug(f"{self.module}:background thread:closing event loop")
                loop.close()
                return
            elif time.perf_counter() - pending_start_time >= ASYNCIO_SHUTDOWN_TIME:
                logger.warning(
                    f"{self.module}:background thread:closing event loop with "
                    f"{len(pending)} tasks left"
                )
                # Suppress exception handling - otherwise the handler catches "Task
                # was destroyed but it is pending!"
                loop.set_exception_handler(lambda _l, _c: None)
                try:
                    loop.close()
                except Exception:
                    # Exception expected with pending tasks
                    pass
                return
            logger.debug(f"{self.module}:{len(pending)} tasks left")
            loop.run_until_complete(asyncio.sleep(1))