function_app.py (74 lines of code) (raw):

# function_app.py import asyncio import os import json import logging import warnings import azure.functions as func from azurefunctions.extensions.http.fastapi import Request, StreamingResponse, JSONResponse from orchestration import RequestResponseOrchestrator, StreamingOrchestrator, OrchestratorConfig # User Warning configuration import warnings # Available options for USER_WARNING_FILTER: # ignore - never show the warning # always - always show the warning # error - turn the warning into an exception # once - show the warning only once # module - show the warning only once per module # default - default Python behavior user_warning_filter = os.environ.get('USER_WARNING_FILTER', 'ignore').lower() warnings.filterwarnings(user_warning_filter, category=UserWarning) # Logging configuration logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper(), force=True) logging.getLogger("azure").setLevel(os.environ.get('AZURE_LOGLEVEL', 'WARNING').upper()) logging.getLogger("httpx").setLevel(os.environ.get('HTTPX_LOGLEVEL', 'ERROR').upper()) logging.getLogger("httpcore").setLevel(os.environ.get('HTTPCORE_LOGLEVEL', 'ERROR').upper()) logging.getLogger("openai._base_client").setLevel(os.environ.get('OPENAI_BASE_CLIENT_LOGLEVEL', 'WARNING').upper()) logging.getLogger("urllib3").setLevel(os.environ.get('URLLIB3_LOGLEVEL', 'WARNING').upper()) logging.getLogger("urllib3.connectionpool").setLevel(os.environ.get('URLLIB3_CONNECTIONPOOL_LOGLEVEL', 'WARNING').upper()) logging.getLogger("openai").setLevel(os.environ.get('OPENAI_LOGLEVEL', 'WARNING').upper()) logging.getLogger("autogen_core").setLevel(os.environ.get('AUTOGEN_CORE_LOGLEVEL', 'WARNING').upper()) logging.getLogger("autogen_core.events").setLevel(os.environ.get('AUTOGEN_EVENTS_LOGLEVEL', 'WARNING').upper()) logging.getLogger("uvicorn.error").propagate = True logging.getLogger("uvicorn.access").propagate = True # Create the Function App with the desired auth level. app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) @app.route(route="orc", methods=[func.HttpMethod.POST]) async def orc(req: Request) -> JSONResponse: data = await req.json() conversation_id = data.get("conversation_id") question = data.get("question") # Gather client principal info (optional) client_principal = { "id": data.get("client_principal_id", "00000000-0000-0000-0000-000000000000"), "name": data.get("client_principal_name", "anonymous"), "group_names": data.get("client_group_names", "") } access_token = data.get("access_token", None) if question: orchestrator = RequestResponseOrchestrator(conversation_id, OrchestratorConfig(), client_principal, access_token) result = await orchestrator.answer(question) return JSONResponse(content=result) else: return JSONResponse(content={"error": "no question found in json input"}, status_code=400) @app.route(route="orcstream", methods=[func.HttpMethod.POST]) async def orchestrator_streaming(req: Request) -> StreamingResponse: data = await req.json() conversation_id = data.get("conversation_id") question = data.get("question") optimize_for_audio = data.get("optimize_for_audio", False) # Gather client principal info (optional) client_principal = { "id": data.get("client_principal_id", "00000000-0000-0000-0000-000000000000"), "name": data.get("client_principal_name", "anonymous"), "group_names": data.get("client_group_names", "") } access_token = data.get("access_token", None) if question: orchestrator = StreamingOrchestrator(conversation_id, OrchestratorConfig(), client_principal, access_token) orchestrator.set_optimize_for_audio(optimize_for_audio) async def stream_generator(): logging.info("[orcstream_endpoint] Entering stream_generator") last_yield = asyncio.get_event_loop().time() heartbeat_interval = 15 # seconds between heartbeats heartbeat_count = 0 async for chunk in orchestrator.answer(question): now = asyncio.get_event_loop().time() # If the time since the last yield exceeds the heartbeat interval, send a heartbeat if now - last_yield >= heartbeat_interval: heartbeat_count += 1 logging.info(f"Sending heartbeat #{heartbeat_count}") yield "\n\n" last_yield = now if chunk: # logging.info(f"Yielding chunk: {chunk}") # For text-only mode, yield the raw chunk; else, serialize to JSON. yield chunk last_yield = now return StreamingResponse(stream_generator(), media_type="text/event-stream") else: return JSONResponse(content={"error": "no question found in json input"}, status_code=400)