orchestrator_client.py (66 lines of code) (raw):

import os import json import httpx import logging from azure.identity import ManagedIdentityCredential, AzureCliCredential, ChainedTokenCredential import requests # Obtain the token using Managed Identity def get_managed_identity_token(): credential = ChainedTokenCredential( ManagedIdentityCredential(), AzureCliCredential() ) token = credential.get_token("https://management.azure.com/.default").token return token def get_function_key(): subscription_id = os.getenv('AZURE_SUBSCRIPTION_ID') resource_group = os.getenv('AZURE_RESOURCE_GROUP_NAME') function_app_name = os.getenv('AZURE_ORCHESTRATOR_FUNC_NAME') token = get_managed_identity_token() logging.info("[orchestrator_client] Obtaining function key.") # URL to get all function keys, including the default one requestUrl = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.Web/sites/{function_app_name}/functions/orchestrator_streaming/listKeys?api-version=2022-03-01" requestHeaders = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } response = requests.post(requestUrl, headers=requestHeaders) # Check for HTTP errors if response.status_code >= 400: logging.error(f"[orchestrator_client] Failed to obtain function key. HTTP status code: {response.status_code}. Error details: {response.text}") function_key = None else: try: response_json = response.json() function_key = response_json['default'] except KeyError as e: function_key = None logging.error(f"[orchestrator_client] Error when getting function key. Details: {str(e)}.") return function_key async def call_orchestrator_stream(conversation_id: str, question: str, auth_info: dict): url = os.getenv("ORCHESTRATOR_STREAM_ENDPOINT") if not url: raise Exception("ORCHESTRATOR_STREAM_ENDPOINT not set in environment variables") if 'localhost' in url: function_key = "dont_need_function_key" else: function_key = get_function_key() if not function_key: raise Exception(f"Error getting function key. Conversation ID: {conversation_id if conversation_id else 'N/A'}") headers = { 'Content-Type': 'application/json', 'x-functions-key': function_key } payload = { "conversation_id": conversation_id, "question": question, "client_principal_id": auth_info.get('client_principal_id', 'no-auth'), "client_principal_name": auth_info.get('client_principal_name', 'anonymous'), "client_group_names": auth_info.get('client_group_names', []), "access_token": auth_info.get('access_token') } async with httpx.AsyncClient(timeout=None) as client: async with client.stream("POST", url, json=payload, headers=headers) as response: if response.status_code >= 400: raise Exception(f"Error calling orchestrator. HTTP status code: {response.status_code}. Details: {response.reason_phrase}") async for chunk in response.aiter_text(): if not chunk: continue yield chunk # logging.info("[orchestrator_client] Yielding text chunk: %s", chunk)