in ossdbtoolsservice/connection/connection_service.py [0:0]
def connect(self, params: ConnectRequestParams) -> Optional[ConnectionCompleteParams]:
"""
Open a connection using the given connection information.
If a connection was already open, disconnect first. Return a connection response indicating
whether the connection was successful
"""
connection_info: ConnectionInfo = self.owner_to_connection_map.get(params.owner_uri)
# If there is no saved connection or the saved connection's options do not match, create a new one
if connection_info is None or connection_info.details.options != params.connection.options:
if connection_info is not None:
self._close_connections(connection_info)
connection_info = ConnectionInfo(params.owner_uri, params.connection)
self.owner_to_connection_map[params.owner_uri] = connection_info
# Get the connection for the given type and build a response if it is present, otherwise open the connection
connection = connection_info.get_connection(params.type)
if connection is not None:
return _build_connection_response(connection_info, params.type)
# The connection doesn't exist yet. Cancel any ongoing connection and set up a cancellation token
cancellation_key = (params.owner_uri, params.type)
cancellation_token = CancellationToken()
with self._cancellation_lock:
if cancellation_key in self._cancellation_map:
self._cancellation_map[cancellation_key].cancel()
self._cancellation_map[cancellation_key] = cancellation_token
# Get the type of server and config
provider_name = self._service_provider.provider
config = self._service_provider[constants.WORKSPACE_SERVICE_NAME].configuration
try:
# Get connection to DB server using the provided connection params
connection: ServerConnection = ConnectionManager(provider_name, config, params.connection.options).get_connection()
except Exception as err:
return _build_connection_response_error(connection_info, params.type, err)
finally:
# Remove this thread's cancellation token if needed
with self._cancellation_lock:
if (cancellation_key in self._cancellation_map
and cancellation_token is self._cancellation_map[cancellation_key]):
del self._cancellation_map[cancellation_key]
# If the connection was canceled, close it
if cancellation_token.canceled:
connection.close()
return None
# The connection was not canceled, so add the connection and respond
connection_info.add_connection(params.type, connection)
self._notify_on_connect(params.type, connection_info)
return _build_connection_response(connection_info, params.type)