in aws_advanced_python_wrapper/wrapper.py [0:0]
def connect(
target: Union[None, str, Callable] = None,
conninfo: str = "",
*args: Any,
**kwargs: Any) -> AwsWrapperConnection:
if not target:
raise Error(Messages.get("Wrapper.RequiredTargetDriver"))
if not callable(target):
raise Error(Messages.get("Wrapper.ConnectMethod"))
target_func: Callable = target
props: Properties = PropertiesUtils.parse_properties(conn_info=conninfo, **kwargs)
logger.debug("Wrapper.Properties", PropertiesUtils.log_properties(PropertiesUtils.mask_properties(props)))
telemetry_factory = DefaultTelemetryFactory(props)
context = telemetry_factory.open_telemetry_context(__name__, TelemetryTraceLevel.TOP_LEVEL)
try:
driver_dialect_manager: DriverDialectManager = DriverDialectManager()
driver_dialect = driver_dialect_manager.get_dialect(target_func, props)
container: PluginServiceManagerContainer = PluginServiceManagerContainer()
plugin_service = PluginServiceImpl(
container, props, target_func, driver_dialect_manager, driver_dialect)
plugin_manager: PluginManager = PluginManager(container, props, telemetry_factory)
return AwsWrapperConnection(target_func, plugin_service, plugin_service, plugin_manager)
except Exception as ex:
context.set_exception(ex)
context.set_success(False)
raise ex
finally:
context.close_context()