def connect()

in ossdbtoolsservice/connection/connection_service.py [0:0]


    def connect(self, params: ConnectRequestParams) -> Optional[ConnectionCompleteParams]:
        """
        Open a connection using the given connection information.

        If a connection was already open, disconnect first. Return a connection response indicating
        whether the connection was successful
        """
        connection_info: ConnectionInfo = self.owner_to_connection_map.get(params.owner_uri)

        # If there is no saved connection or the saved connection's options do not match, create a new one
        if connection_info is None or connection_info.details.options != params.connection.options:
            if connection_info is not None:
                self._close_connections(connection_info)
            connection_info = ConnectionInfo(params.owner_uri, params.connection)
            self.owner_to_connection_map[params.owner_uri] = connection_info

        # Get the connection for the given type and build a response if it is present, otherwise open the connection
        connection = connection_info.get_connection(params.type)
        if connection is not None:
            return _build_connection_response(connection_info, params.type)

        # The connection doesn't exist yet. Cancel any ongoing connection and set up a cancellation token
        cancellation_key = (params.owner_uri, params.type)
        cancellation_token = CancellationToken()
        with self._cancellation_lock:
            if cancellation_key in self._cancellation_map:
                self._cancellation_map[cancellation_key].cancel()
            self._cancellation_map[cancellation_key] = cancellation_token

        # Get the type of server and config
        provider_name = self._service_provider.provider
        config = self._service_provider[constants.WORKSPACE_SERVICE_NAME].configuration
        try:
            # Get connection to DB server using the provided connection params
            connection: ServerConnection = ConnectionManager(provider_name, config, params.connection.options).get_connection()
        except Exception as err:
            return _build_connection_response_error(connection_info, params.type, err)
        finally:
            # Remove this thread's cancellation token if needed
            with self._cancellation_lock:
                if (cancellation_key in self._cancellation_map
                        and cancellation_token is self._cancellation_map[cancellation_key]):
                    del self._cancellation_map[cancellation_key]

        # If the connection was canceled, close it
        if cancellation_token.canceled:
            connection.close()
            return None

        # The connection was not canceled, so add the connection and respond
        connection_info.add_connection(params.type, connection)
        self._notify_on_connect(params.type, connection_info)
        return _build_connection_response(connection_info, params.type)