in aws_advanced_python_wrapper/wrapper.py [0:0]
def __init__(
self,
target_func: Callable,
host_list_provider_service: HostListProviderService,
plugin_service: PluginService,
plugin_manager: PluginManager):
self._plugin_service = plugin_service
self._plugin_manager = plugin_manager
host_list_provider_init = plugin_service.database_dialect.get_host_list_provider_supplier()
plugin_service.host_list_provider = host_list_provider_init(host_list_provider_service, plugin_service.props)
plugin_manager.init_host_provider(plugin_service.props, host_list_provider_service)
plugin_service.refresh_host_list()
if plugin_service.current_connection is not None:
return
if plugin_service.initial_connection_host_info is None:
raise AwsWrapperError(Messages.get("AwsWrapperConnection.InitialHostInfoNone"))
conn = plugin_manager.connect(
target_func,
plugin_service.driver_dialect,
plugin_service.initial_connection_host_info,
plugin_service.props,
True)
if not conn:
raise AwsWrapperError(Messages.get("AwsWrapperConnection.ConnectionNotOpen"))
plugin_service.set_current_connection(conn, plugin_service.initial_connection_host_info)