in src/dubbo/extension/extension_loader.py [0:0]
def get_extension(self, interface: Any, impl_name: str) -> Any:
"""
Get the extension implementation for the interface.
:param interface: Interface class.
:type interface: Any
:param impl_name: Implementation name.
:type impl_name: str
:return: Extension implementation class.
:rtype: Any
:raises ExtensionError: If the interface or implementation is not found.
"""
# Get the registry for the interface
impls = self._registries.get(interface)
if not impls:
raise ExtensionError(f"Interface '{interface.__name__}' is not supported.")
# Get the full name of the implementation
full_name = impls.get(impl_name)
if not full_name:
raise ExtensionError(f"Implementation '{impl_name}' for interface '{interface.__name__}' is not exist.")
try:
# Split the full name into module and class
module_name, class_name = full_name.rsplit(".", 1)
# Load the module and get the class
module = importlib.import_module(module_name)
subclass = getattr(module, class_name)
# Return the subclass
return subclass
except Exception as e:
raise ExtensionError(
f"Failed to load extension '{impl_name}' for interface '{interface.__name__}'. \nDetail: {e}"
)