def stop()

in src/dubbo/remoting/aio/event_loop.py [0:0]


    def stop(self, wait: bool = False) -> None:
        """
        Stop the asyncio event loop.
        """
        if self._stopped:
            return
        with self._lock:
            signal = threading.Event()
            asyncio.run_coroutine_threadsafe(self._stop(signal=signal), self._loop)
            # Wait for the running_loop to stop
            if wait:
                signal.wait()
                if self._in_other_tread:
                    self._thread.join()
            self._stopped = True
            self._started = False