orc/orchestrator.py (77 lines of code) (raw):

import logging import os import time import uuid from azure.cosmos.aio import CosmosClient from datetime import datetime from shared.util import format_answer from azure.identity.aio import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential import orc.code_orchestration as code_orchestration # logging level logging.getLogger('azure').setLevel(logging.WARNING) logging.getLogger('azure.cosmos').setLevel(logging.WARNING) LOGLEVEL = os.environ.get('LOGLEVEL', 'DEBUG').upper() logging.basicConfig(level=LOGLEVEL) # Constants set from environment variables (external services credentials and configuration) # Cosmos DB AZURE_DB_ID = os.environ.get("AZURE_DB_ID") AZURE_DB_NAME = os.environ.get("AZURE_DB_NAME") AZURE_DB_URI = f"https://{AZURE_DB_ID}.documents.azure.com:443/" # AOAI AZURE_OPENAI_STREAM = os.environ.get("AZURE_OPENAI_STREAM") or "false" AZURE_OPENAI_STREAM = True if AZURE_OPENAI_STREAM.lower() == "true" else False ANSWER_FORMAT = "html" # html, markdown, none async def get_credentials(): async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: return credential def generate_security_ids(client_principal): security_ids = 'anonymous' if client_principal is not None: group_names = client_principal['group_names'] security_ids = f"{client_principal['id']}" + (f",{group_names}" if group_names else "") return security_ids async def run(conversation_id, ask, client_principal): start_time = time.time() # 1) Get conversation stored in CosmosDB # create conversation_id if not provided if conversation_id is None or conversation_id == "": conversation_id = str(uuid.uuid4()) logging.info(f"[orchestrator] {conversation_id} conversation_id is Empty, creating new conversation_id.") logging.info(f"[orchestrator] {conversation_id} starting conversation flow.") # get conversation #credential = get_credentials() async with ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) as credential: async with CosmosClient(AZURE_DB_URI, credential=credential) as db_client: db = db_client.get_database_client(database=AZURE_DB_NAME) container = db.get_container_client('conversations') try: conversation = await container.read_item(item=conversation_id, partition_key=conversation_id) logging.info(f"[orchestrator] conversation {conversation_id} retrieved.") except Exception as e: logging.info(f"[orchestrator] customer sent an inexistent conversation_id, saving new conversation_id") conversation = await container.create_item(body={"id": conversation_id}) # get conversation data conversation_data = conversation.get('conversation_data', {'start_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'interactions': []}) # history history = conversation.get('history', []) history.append({"role": "user", "content": ask}) # 2) get answer and sources logging.info(f"[orchestrator] executing RAG retrieval using code orchestration") security_ids = generate_security_ids(client_principal) answer_dict = await code_orchestration.get_answer(history, security_ids,conversation_id) # 3) update and save conversation (containing history and conversation data) # history if answer_dict['answer_generated_by'] == 'content_filters_check': history[-1]['content'] = '<FILTERED BY MODEL>' history.append({"role": "assistant", "content": answer_dict['answer']}) conversation['history'] = history # conversation data response_time = round(time.time() - start_time,2) interaction = { 'user_id': client_principal['id'], 'user_name': client_principal['name'], 'response_time': response_time } interaction.update(answer_dict) conversation_data['interactions'].append(interaction) conversation['conversation_data'] = conversation_data conversation = await container.replace_item(item=conversation, body=conversation) # 4) return answer result = {"conversation_id": conversation_id, "answer": format_answer(interaction['answer'], ANSWER_FORMAT), "data_points": interaction['sources'] if 'sources' in interaction else '', "thoughts": f"Searched for:\n{interaction['search_query']}\n\nPrompt:\n{interaction['prompt']}"} logging.info(f"[orchestrator] {conversation_id} finished conversation flow. {response_time} seconds. answer: {interaction['answer'][:30]}") return result