google/cloud/alloydb/connector/asyncpg.py (27 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import ssl from typing import Any, TYPE_CHECKING SERVER_PROXY_PORT = 5433 if TYPE_CHECKING: import asyncpg async def connect( ip_address: str, ctx: ssl.SSLContext, **kwargs: Any ) -> "asyncpg.Connection": """Helper function to create an asyncpg DB-API connection object. :type ip_address: str :param ip_address: A string containing an IP address for the AlloyDB instance. :type ctx: ssl.SSLContext :param ctx: An SSLContext object created from the AlloyDB server CA cert and ephemeral cert. :type kwargs: Any :param kwargs: Keyword arguments for establishing asyncpg connection object to AlloyDB instance. :rtype: asyncpg.Connection :returns: An asyncpg.Connection object to an AlloyDB instance. """ try: import asyncpg except ImportError: raise ImportError( 'Unable to import module "asyncpg." Please install and try again.' ) user = kwargs.pop("user") db = kwargs.pop("db") passwd = kwargs.pop("password") return await asyncpg.connect( user=user, database=db, password=passwd, host=ip_address, port=SERVER_PROXY_PORT, ssl=ctx, direct_tls=True, **kwargs, )