in src/dubbo/remoting/aio/http2/protocol.py [0:0]
def __init__(self, url: URL, h2_config: H2Configuration):
self._url = url
self._loop = asyncio.get_running_loop()
# Create the H2 state machine
self._h2_connection = H2Connection(h2_config)
# The transport instance
self._transport: Optional[asyncio.Transport] = None
self._flow_controller: Optional[RemoteFlowController] = None
if self._url.attributes[common_constants.PROTOCOL_KEY] == Http2ServerProtocol:
listener_factory = self._url.attributes[h2_constants.LISTENER_FACTORY_KEY]
self._stream_handler = StreamServerMultiplexHandler(listener_factory)
else:
self._stream_handler = self._url.attributes[h2_constants.STREAM_HANDLER_KEY]
# last time of receiving data
self._last_read = time.time()
# last time of sending data
self._last_write = time.time()