google/cloud/sql/connector/lazy.py (79 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio from datetime import datetime from datetime import timedelta from datetime import timezone import logging from typing import Optional from google.cloud.sql.connector.client import CloudSQLClient from google.cloud.sql.connector.connection_info import ConnectionInfo from google.cloud.sql.connector.connection_info import ConnectionInfoCache from google.cloud.sql.connector.connection_name import ConnectionName from google.cloud.sql.connector.refresh_utils import _refresh_buffer logger = logging.getLogger(name=__name__) class LazyRefreshCache(ConnectionInfoCache): """Cache that refreshes connection info when a caller requests a connection. Only refreshes the cache when a new connection is requested and the current certificate is close to or already expired. This is the recommended option for serverless environments. """ def __init__( self, conn_name: ConnectionName, client: CloudSQLClient, keys: asyncio.Future, enable_iam_auth: bool = False, ) -> None: """Initializes a LazyRefreshCache instance. Args: conn_name (ConnectionName): The Cloud SQL instance's connection name. client (CloudSQLClient): The Cloud SQL Client instance. keys (asyncio.Future): A future to the client's public-private key pair. enable_iam_auth (bool): Enables automatic IAM database authentication (Postgres and MySQL) as the default authentication method for all connections. """ self._conn_name = conn_name self._enable_iam_auth = enable_iam_auth self._keys = keys self._client = client self._lock = asyncio.Lock() self._cached: Optional[ConnectionInfo] = None self._needs_refresh = False self._closed = False @property def conn_name(self) -> ConnectionName: return self._conn_name @property def closed(self) -> bool: return self._closed async def force_refresh(self) -> None: """ Invalidates the cache and configures the next call to connect_info() to retrieve a fresh ConnectionInfo instance. """ async with self._lock: self._needs_refresh = True async def connect_info(self) -> ConnectionInfo: """Retrieves ConnectionInfo instance for establishing a secure connection to the Cloud SQL instance. """ async with self._lock: # If connection info is cached, check expiration. # Pad expiration with a buffer to give the client plenty of time to # establish a connection to the server with the certificate. if ( self._cached and not self._needs_refresh and datetime.now(timezone.utc) < (self._cached.expiration - timedelta(seconds=_refresh_buffer)) ): logger.debug( f"['{self._conn_name}']: Connection info " "is still valid, using cached info" ) return self._cached logger.debug( f"['{self._conn_name}']: Connection info " "refresh operation started" ) try: conn_info = await self._client.get_connection_info( self._conn_name, self._keys, self._enable_iam_auth, ) except Exception as e: logger.debug( f"['{self._conn_name}']: Connection info " f"refresh operation failed: {str(e)}" ) raise logger.debug( f"['{self._conn_name}']: Connection info " "refresh operation completed successfully" ) logger.debug( f"['{self._conn_name}']: Current certificate " f"expiration = {str(conn_info.expiration)}" ) self._cached = conn_info self._needs_refresh = False return conn_info async def close(self) -> None: """Close is a no-op and provided purely for a consistent interface with other cache types. """ self._closed = True return