src/dubbo/remoting/aio/aio_transporter.py (163 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import concurrent import threading from typing import Union from dubbo.constants import common_constants from dubbo.loggers import loggerFactory from dubbo.remoting._interfaces import Client, Server, Transporter from dubbo.remoting.aio import ConnectionStateListener, constants as aio_constants from dubbo.remoting.aio.event_loop import EventLoop from dubbo.remoting.aio.exceptions import RemotingError from dubbo.url import URL from dubbo.utils import FutureHelper _LOGGER = loggerFactory.get_logger() class AioClient(Client, ConnectionStateListener): """ Asyncio client. """ __slots__ = [ "_global_lock", "_protocol", "_connected", "_closed", "_active_close", "_event_loop", ] def __init__(self, url: URL): """ Initialize the client. :param url: The URL. :type url: URL """ super().__init__(url) self._global_lock = threading.Lock() # Set the side of the transporter to client. self._protocol = None # the status of the client self._connected = False self._closed = False self._active_close = False # event loop self._event_loop: EventLoop = EventLoop() # connect to the server self.connect() def is_connected(self) -> bool: """ Check if the client is connected. """ return self._connected def is_closed(self) -> bool: """ Check if the client is closed. """ return self._closed def connect(self) -> None: """ Connect to the server. """ with self._global_lock: if self.is_connected(): return elif self.is_closed(): raise RemotingError("The client is closed.") # Run the connection logic in the event loop. if self._event_loop.stopped: raise RemotingError("The event loop is stopped.") elif not self._event_loop.started: self._event_loop.start() future = concurrent.futures.Future() asyncio.run_coroutine_threadsafe(self._do_connect(future), self._event_loop.loop) try: self._protocol = future.result(timeout=3) _LOGGER.info( "Connected to the server. host: %s, port: %s", self._url.host, self._url.port, ) except Exception: raise RemotingError(f"Failed to connect to the server. host: {self._url.host}, port: {self._url.port}") async def _do_connect(self, future: Union[concurrent.futures.Future, asyncio.Future]): """ Connect to the server. """ running_loop = asyncio.get_running_loop() # Create the connection. _, protocol = await running_loop.create_connection( lambda: self._url.attributes[common_constants.PROTOCOL_KEY](self._url, self), self._url.host, self._url.port, ) # Set the protocol. FutureHelper.set_result(future, protocol) def close(self) -> None: """ Close the client. """ with self._global_lock: if self.is_closed(): return self._active_close = True self._protocol.close() async def connection_made(self): # Update the connection status. self._connected = True async def connection_lost(self, exc): self._connected = False self._closed = True # Check if it is an active shutdown if self._active_close: self._event_loop.stop() else: # try reconnect for _ in range(aio_constants.RECONNECT_TIMES): try: future = asyncio.Future() await self._do_connect(future) # Update the protocol. self._protocol = future.result() # Update the connection status. self._connected = True self._closed = False self._active_close = False _LOGGER.info( "Reconnected to the server. host: %s, port: %s", self._url.host, self._url.port, ) return except Exception as e: exc = e _LOGGER.error("Failed to reconnect to the server. %s", exc) # wait for a while await asyncio.sleep(1) # cannot reconnect raise RemotingError( f"Failed to reconnect to the server.{exc}", ) class AioServer(Server): """ Asyncio server. """ def __init__(self, url: URL): self._url = url # Set the side of the transporter to server. self._url.parameters[common_constants.SIDE_KEY] = common_constants.SERVER_VALUE # the event to indicate the close status of the server self._event_loop = EventLoop() self._event_loop.start() # Whether the server is exporting self._exporting = False # Whether the server is exported self._exported = False # Whether the server is closing self._closing = False # Whether the server is closed self._closed = False # start the server self.export() def is_exported(self) -> bool: return self._exported or self._exporting def is_closed(self) -> bool: return self._closed or self._closing def export(self): """ Export the server. """ if self.is_exported(): return elif self.is_closed(): raise RemotingError("The server is closed.") async def _inner_operation(_future: concurrent.futures.Future): try: running_loop = asyncio.get_running_loop() server = await running_loop.create_server( lambda: self._url.attributes[common_constants.PROTOCOL_KEY](self._url), self._url.host, self._url.port, ) # Serve the server forever async with server: FutureHelper.set_result(_future, None) await server.serve_forever() except Exception as e: FutureHelper.set_exception(_future, e) # Run the server logic in the event loop. future = concurrent.futures.Future() asyncio.run_coroutine_threadsafe(_inner_operation(future), self._event_loop.loop) try: exc = future.exception() if exc: raise RemotingError("Failed to export the server") from exc else: self._exported = True _LOGGER.info( "Exported the server. host: %s, port: %s", self._url.host, self._url.port, ) finally: self._exporting = False def close(self): """ Close the server. """ if self.is_closed(): return self._closing = True try: self._event_loop.stop() self._closed = True except Exception as e: raise RemotingError("Failed to close the server") from e finally: self._closing = False class AioTransporter(Transporter): """ Asyncio transporter. """ def connect(self, url: URL) -> Client: return AioClient(url) def bind(self, url: URL) -> Server: return AioServer(url)