google/cloud/alloydb/connector/lazy.py (74 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio from datetime import datetime from datetime import timedelta from datetime import timezone import logging from typing import Optional from google.cloud.alloydb.connector.client import AlloyDBClient from google.cloud.alloydb.connector.connection_info import ConnectionInfo from google.cloud.alloydb.connector.instance import _parse_instance_uri from google.cloud.alloydb.connector.refresh_utils import _refresh_buffer logger = logging.getLogger(name=__name__) class LazyRefreshCache: """Cache that refreshes connection info when a caller requests a connection. Only refreshes the cache when a new connection is requested and the current certificate is close to or already expired. This is the recommended option for serverless environments. """ def __init__( self, instance_uri: str, client: AlloyDBClient, keys: asyncio.Future, ) -> None: """Initializes a LazyRefreshCache instance. Args: instance_connection_string (str): The AlloyDB Instance's connection URI. client (AlloyDBClient): The AlloyDB client instance. keys (asyncio.Future): A future to the client's public-private key pair. """ # validate and parse instance connection name self._project, self._region, self._cluster, self._name = _parse_instance_uri( instance_uri ) self._instance_uri = instance_uri self._keys = keys self._client = client self._lock = asyncio.Lock() self._cached: Optional[ConnectionInfo] = None self._needs_refresh = False async def force_refresh(self) -> None: """ Invalidates the cache and configures the next call to connect_info() to retrieve a fresh ConnectionInfo instance. """ async with self._lock: self._needs_refresh = True async def connect_info(self) -> ConnectionInfo: """Retrieves ConnectionInfo instance for establishing a secure connection to the AlloyDB instance. """ async with self._lock: # If connection info is cached, check expiration. # Pad expiration with a buffer to give the client plenty of time to # establish a connection to the server with the certificate. if ( self._cached and not self._needs_refresh and datetime.now(timezone.utc) < (self._cached.expiration - timedelta(seconds=_refresh_buffer)) ): logger.debug( f"['{self._instance_uri}']: Connection info " "is still valid, using cached info" ) return self._cached logger.debug( f"['{self._instance_uri}']: Connection info " "refresh operation started" ) try: conn_info = await self._client.get_connection_info( self._project, self._region, self._cluster, self._name, self._keys, ) except Exception as e: logger.debug( f"['{self._instance_uri}']: Connection info " f"refresh operation failed: {str(e)}" ) raise logger.debug( f"['{self._instance_uri}']: Connection info " "refresh operation completed successfully" ) logger.debug( f"['{self._instance_uri}']: Current certificate " f"expiration = {str(conn_info.expiration)}" ) self._cached = conn_info self._needs_refresh = False return conn_info async def close(self) -> None: """Close is a no-op and provided purely for a consistent interface with other cache types. """ pass