connectors/cosmosdb.py (74 lines of code) (raw):

import logging import os import time from azure.cosmos.aio import CosmosClient from azure.identity.aio import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential MAX_RETRIES = 10 # Maximum number of retries for rate limit errors class CosmosDBClient: """ CosmosDBClient uses the Cosmos SDK's retry mechanism with exponential backoff. The number of retries is controlled by the MAX_RETRIES environment variable. Delays between retries start at 0.5 seconds, doubling up to 8 seconds. If a rate limit error occurs after retries, the client will retry once more after the retry-after-ms header duration (if the header is present). """ def __init__(self): """ Initializes the Cosmos DB client with credentials and endpoint. """ # Get Azure Cosmos DB configuration self.db_id = os.environ.get("AZURE_DB_ID") self.db_name = os.environ.get("AZURE_DB_NAME") self.db_uri = f"https://{self.db_id}.documents.azure.com:443/" # 'conversations' # self.conversation_id # conversation # logging.info(f"[base_orchestrator] customer sent an inexistent conversation_id, saving new conversation_id") # conversation = await container.create_item(body={"id": self.conversation_id}) # self.conversation_data = self.conversation.get('conversation_data', # {'start_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'interactions': []}) # self.history = self.conversation_data.get('history', []) async def list_documents(self, container_name) -> list: """ Lists all documents from the given container. """ async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: async with CosmosClient(self.db_uri, credential=credential) as db_client: db = db_client.get_database_client(database=self.db_name) container = db.get_container_client(container_name) # Correct usage without the outdated argument query = "SELECT * FROM c" items_iterable = container.query_items(query=query, partition_key=None) documents = [] async for item in items_iterable: documents.append(item) return documents async def get_document(self, container, key) -> dict: async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: async with CosmosClient(self.db_uri, credential=credential) as db_client: db = db_client.get_database_client(database=self.db_name) container = db.get_container_client(container) try: document = await container.read_item(item=key, partition_key=key) logging.info(f"[cosmosdb] document {key} retrieved.") except Exception as e: document = None logging.info(f"[cosmosdb] document {key} does not exist.") return document async def create_document(self, container, key, body=None) -> dict: async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: async with CosmosClient(self.db_uri, credential=credential) as db_client: db = db_client.get_database_client(database=self.db_name) container = db.get_container_client(container) try: if body is None: body = {"id": key} else: body["id"] = key # ensure the document id is set document = await container.create_item(body=body) logging.info(f"[cosmosdb] document {key} created.") except Exception as e: document = None logging.info(f"[cosmosdb] error creating document {key}. Error: {e}") return document async def update_document(self, container, document) -> dict: async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: async with CosmosClient(self.db_uri, credential=credential) as db_client: db = db_client.get_database_client(database=self.db_name) container = db.get_container_client(container) try: document = await container.replace_item(item=document["id"], body=document) logging.info(f"[cosmosdb] document updated.") except Exception as e: document = None logging.warning(f"[cosmosdb] could not update document: {e}", exc_info=True) return document