in src/dubbo/server.py [0:0]
def _initialize(self):
"""
Initialize the server.
"""
with self._global_lock:
if self._initialized:
return
# get the protocol
service_protocol = extensionLoader.get_extension(Protocol, self._service.protocol)()
registry_config = self._dubbo.registry_config
self._protocol = (
RegistryProtocol(registry_config, service_protocol) if self._dubbo.registry_config else service_protocol
)
# build url
service_url = self._service.to_url()
if registry_config:
self._url = registry_config.to_url().copy()
self._url.attributes[common_constants.EXPORT_KEY] = service_url
for k, v in service_url.attributes.items():
self._url.attributes[k] = v
else:
self._url = service_url