in google/cloud/alloydb/connector/connector.py [0:0]
def connect(self, instance_uri: str, driver: str, **kwargs: Any) -> Any:
"""
Prepares and returns a database DBAPI connection object.
Starts background tasks to refresh the certificates and get
AlloyDB instance IP address. Creates a secure TLS connection
to establish connection to AlloyDB instance.
Args:
instance_uri (str): The instance URI of the AlloyDB instance.
ex. projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>
driver (str): A string representing the database driver to connect with.
Supported drivers are pg8000.
**kwargs: Pass in any database driver-specific arguments needed
to fine tune connection.
Returns:
connection: A DBAPI connection to the specified AlloyDB instance.
"""
if self._closed:
raise ClosedConnectorError(
"Connection attempt failed because the connector has already been closed."
)
# call async connect and wait on result
connect_task = asyncio.run_coroutine_threadsafe(
self.connect_async(instance_uri, driver, **kwargs), self._loop
)
return connect_task.result()