def connect()

in aws_advanced_python_wrapper/wrapper.py [0:0]


    def connect(
            target: Union[None, str, Callable] = None,
            conninfo: str = "",
            *args: Any,
            **kwargs: Any) -> AwsWrapperConnection:
        if not target:
            raise Error(Messages.get("Wrapper.RequiredTargetDriver"))

        if not callable(target):
            raise Error(Messages.get("Wrapper.ConnectMethod"))
        target_func: Callable = target

        props: Properties = PropertiesUtils.parse_properties(conn_info=conninfo, **kwargs)
        logger.debug("Wrapper.Properties", PropertiesUtils.log_properties(PropertiesUtils.mask_properties(props)))

        telemetry_factory = DefaultTelemetryFactory(props)
        context = telemetry_factory.open_telemetry_context(__name__, TelemetryTraceLevel.TOP_LEVEL)

        try:
            driver_dialect_manager: DriverDialectManager = DriverDialectManager()
            driver_dialect = driver_dialect_manager.get_dialect(target_func, props)
            container: PluginServiceManagerContainer = PluginServiceManagerContainer()
            plugin_service = PluginServiceImpl(
                container, props, target_func, driver_dialect_manager, driver_dialect)
            plugin_manager: PluginManager = PluginManager(container, props, telemetry_factory)

            return AwsWrapperConnection(target_func, plugin_service, plugin_service, plugin_manager)
        except Exception as ex:
            context.set_exception(ex)
            context.set_success(False)
            raise ex
        finally:
            context.close_context()