in src/dubbo/protocol/triple/protocol.py [0:0]
def export(self, url: URL):
"""
Export a service.
"""
if self._server is not None:
return
service_handler = url.attributes[common_constants.SERVICE_HANDLER_KEY]
if isinstance(service_handler, Iterable):
for handler in service_handler:
self._path_resolver[handler.service_name] = handler
else:
self._path_resolver[service_handler.service_name] = service_handler
method_executor = ThreadPoolExecutor(thread_name_prefix=f"dubbo_tri_method_{str(uuid.uuid4())}", max_workers=10)
listener_factory = functools.partial(ServerTransportListener, self._path_resolver, method_executor)
# set stream handler and protocol
url.attributes[aio_constants.LISTENER_FACTORY_KEY] = listener_factory
url.attributes[common_constants.PROTOCOL_KEY] = Http2ServerProtocol
# Create a server
self._server = self._transporter.bind(url)