in src/dubbo/client.py [0:0]
def _initialize(self):
"""
Initialize the invoker.
"""
with self._global_lock:
if self._initialized:
return
# get the protocol
protocol = extensionLoader.get_extension(Protocol, self._reference.protocol)()
registry_config = self._dubbo.registry_config
self._protocol = RegistryProtocol(registry_config, protocol) if self._dubbo.registry_config else protocol
# build url
reference_url = self._reference.to_url()
if registry_config:
self._url = registry_config.to_url().copy()
self._url.path = reference_url.path
for k, v in reference_url.parameters.items():
self._url.parameters[k] = v
else:
self._url = reference_url
# create invoker
self._invoker = self._protocol.refer(self._url)
self._initialized = True