in src/dubbo/remoting/aio/event_loop.py [0:0]
def stop(self, wait: bool = False) -> None:
"""
Stop the asyncio event loop.
"""
if self._stopped:
return
with self._lock:
signal = threading.Event()
asyncio.run_coroutine_threadsafe(self._stop(signal=signal), self._loop)
# Wait for the running_loop to stop
if wait:
signal.wait()
if self._in_other_tread:
self._thread.join()
self._stopped = True
self._started = False