google/cloud/alloydb/connector/client.py (150 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import asyncio import logging from typing import Optional, TYPE_CHECKING from cryptography import x509 from google.api_core.client_options import ClientOptions from google.api_core.gapic_v1.client_info import ClientInfo from google.auth.credentials import TokenState from google.auth.transport import requests import google.cloud.alloydb_v1beta as v1beta from google.protobuf import duration_pb2 from google.cloud.alloydb.connector.connection_info import ConnectionInfo from google.cloud.alloydb.connector.version import __version__ as version if TYPE_CHECKING: from google.auth.credentials import Credentials USER_AGENT: str = f"alloydb-python-connector/{version}" API_VERSION: str = "v1beta" logger = logging.getLogger(name=__name__) def _format_user_agent( driver: Optional[str], custom_user_agent: Optional[str], ) -> str: """ Appends user-defined user agents to the base default agent. """ agent = f"{USER_AGENT}+{driver}" if driver else USER_AGENT if custom_user_agent and isinstance(custom_user_agent, str): agent = f"{agent} {custom_user_agent}" return agent class AlloyDBClient: def __init__( self, alloydb_api_endpoint: str, quota_project: Optional[str], credentials: Credentials, client: Optional[v1beta.AlloyDBAdminAsyncClient] = None, driver: Optional[str] = None, user_agent: Optional[str] = None, ) -> None: """ Establish the client to be used for AlloyDB API requests. Args: alloydb_api_endpoint (str): Base URL to use when calling the AlloyDB API endpoint. quota_project (str): The Project ID for an existing Google Cloud project. The project specified is used for quota and billing purposes. credentials (google.auth.credentials.Credentials): A credentials object created from the google-auth Python library. Must have the AlloyDB Admin scopes. For more info check out https://google-auth.readthedocs.io/en/latest/. client (v1beta.AlloyDBAdminAsyncClient): Async client used to make requests to AlloyDB APIs. Optional, defaults to None and creates new client. driver (str): Database driver to be used by the client. user_agent (str): The custom user-agent string to use in the HTTP header when making requests to AlloyDB APIs. Optional, defaults to None and uses a pre-defined one. """ user_agent = _format_user_agent(driver, user_agent) # TODO(rhatgadkar-goog): Rollback the PR of deciding between creating # AlloyDBAdminClient or AlloyDBAdminAsyncClient when either # https://github.com/grpc/grpc/issues/25364 is resolved or an async REST # transport for AlloyDBAdminAsyncClient gets introduced. # The issue is that the async gRPC transport does not work with multiple # event loops in the same process. So all calls to the AlloyDB Admin # API, even from multiple threads, need to be made to a single-event # loop. See https://github.com/GoogleCloudPlatform/alloydb-python-connector/issues/435 # for more details. self._is_sync = False if client: self._client = client elif driver == "pg8000": self._client = v1beta.AlloyDBAdminClient( credentials=credentials, transport="grpc", client_options=ClientOptions( api_endpoint=alloydb_api_endpoint, quota_project_id=quota_project, ), client_info=ClientInfo( user_agent=user_agent, ), ) self._is_sync = True else: self._client = v1beta.AlloyDBAdminAsyncClient( credentials=credentials, transport="grpc_asyncio", client_options=ClientOptions( api_endpoint=alloydb_api_endpoint, quota_project_id=quota_project, ), client_info=ClientInfo( user_agent=user_agent, ), ) self._credentials = credentials # asyncpg does not currently support using metadata exchange # only use metadata exchange for pg8000 driver self._use_metadata = True if driver == "pg8000" else False self._user_agent = user_agent async def _get_metadata( self, project: str, region: str, cluster: str, name: str, ) -> dict[str, Optional[str]]: """ Fetch the metadata for a given AlloyDB instance. Call the AlloyDB APIs connectInfo method to retrieve the information about an AlloyDB instance that is used to create secure connections. Args: project (str): Google Cloud project ID that the AlloyDB instance resides in. region (str): Google Cloud region of the AlloyDB instance. cluster (str): The name of the AlloyDB cluster. name (str): The name of the AlloyDB instance. Returns: dict: IP addresses of the AlloyDB instance. """ parent = ( f"projects/{project}/locations/{region}/clusters/{cluster}/instances/{name}" ) req = v1beta.GetConnectionInfoRequest(parent=parent) if self._is_sync: resp = self._client.get_connection_info(request=req) else: resp = await self._client.get_connection_info(request=req) # Remove trailing period from PSC DNS name. psc_dns = resp.psc_dns_name if psc_dns: psc_dns = psc_dns.rstrip(".") return { "PRIVATE": resp.ip_address, "PUBLIC": resp.public_ip_address, "PSC": psc_dns, } async def _get_client_certificate( self, project: str, region: str, cluster: str, pub_key: str, ) -> tuple[str, list[str]]: """ Fetch a client certificate for the given AlloyDB cluster. Call the AlloyDB API's generateClientCertificate method to create a signed TLS certificate that is authorized to connect via the AlloyDB instance's serverside proxy. The cert is valid for twenty-four hours. Args: project (str): Google Cloud project ID that the AlloyDB instance resides in. region (str): Google Cloud region of the AlloyDB instance. cluster (str): The name of the AlloyDB cluster. pub_key (str): PEM-encoded client public key. Returns: tuple[str, list[str]]: tuple containing the CA certificate and certificate chain for the AlloyDB instance. """ parent = f"projects/{project}/locations/{region}/clusters/{cluster}" dur = duration_pb2.Duration() dur.seconds = 3600 req = v1beta.GenerateClientCertificateRequest( parent=parent, cert_duration=dur, public_key=pub_key, use_metadata_exchange=self._use_metadata, ) if self._is_sync: resp = self._client.generate_client_certificate(request=req) else: resp = await self._client.generate_client_certificate(request=req) return (resp.ca_cert, resp.pem_certificate_chain) async def get_connection_info( self, project: str, region: str, cluster: str, name: str, keys: asyncio.Future, ) -> ConnectionInfo: """Immediately performs a full refresh operation using the AlloyDB API. Args: project (str): The name of the project the AlloyDB instance is located in. region (str): The region the AlloyDB instance is located in. cluster (str): The cluster the AlloyDB instance is located in. name (str): Name of the AlloyDB instance. keys (asyncio.Future): A future to the client's public-private key pair. Returns: ConnectionInfo: All the information required to connect securely to the AlloyDB instance. """ priv_key, pub_key = await keys # before making AlloyDB API calls, refresh creds if required if not self._credentials.token_state == TokenState.FRESH: self._credentials.refresh(requests.Request()) # fetch metadata metadata_task = asyncio.create_task( self._get_metadata( project, region, cluster, name, ) ) # generate client and CA certs certs_task = asyncio.create_task( self._get_client_certificate( project, region, cluster, pub_key, ) ) ip_addrs, certs = await asyncio.gather(metadata_task, certs_task) # unpack certs ca_cert, cert_chain = certs # get expiration from client certificate cert_obj = x509.load_pem_x509_certificate(cert_chain[0].encode("UTF-8")) expiration = cert_obj.not_valid_after_utc return ConnectionInfo( cert_chain, ca_cert, priv_key, ip_addrs, expiration, )