3-ai-native-e2e-sample/backend/routers/medication.py (129 lines of code) (raw):

from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from azure.ai.projects import AIProjectClient from azure.ai.projects.models import BingGroundingTool, FunctionTool, SubmitToolOutputsAction, RequiredFunctionToolCall, ToolOutput from azure.identity import DefaultAzureCredential from agents.medication_functions import medication_functions import os import logging import json import time import asyncio from pydantic import BaseModel from typing import Optional logger = logging.getLogger(__name__) router = APIRouter(prefix="/agents", tags=["medication"]) class MedicationInfo(BaseModel): name: str notes: Optional[str] = None class MedicationAnalysis(BaseModel): analysis: str interactions: list[str] warnings: list[str] recommendations: list[str] @router.post("/medication/analyze_stream") async def analyze_medication_stream(info: MedicationInfo): async def event_generator(): try: # Initialize client project_client = AIProjectClient.from_connection_string( credential=DefaultAzureCredential(), conn_str=os.environ["PROJECT_CONNECTION_STRING"] ) logger.info(f"Starting streaming medication analysis for: {info.name}") # Get Bing connection and set up the tool bing_conn = project_client.connections.get(connection_name=os.environ["BING_CONNECTION_NAME"]) if not bing_conn: yield f"data: {json.dumps({'type': 'error', 'content': 'No Bing connection found.'})}\n\n" return bing_tool = BingGroundingTool(connection_id=bing_conn.id) # Update system prompt with the new instructions: # The assistant must use Bing to search for medication data, # then call the analyze_medication_info function to format the response. system_prompt = ( "You are a medication analysis assistant. Use Bing to retrieve accurate, up-to-date " "information about the medication specified by the user. Once you obtain Bing's results, " "call the analyze_medication_info function to structure your response as a JSON string. " "Return only a valid JSON string without any markdown or additional text." ) # Configure function tools functions = FunctionTool(functions=medication_functions) # Create agent with both Bing and function tools agent = project_client.agents.create_agent( model=os.environ["MODEL_DEPLOYMENT_NAME"], name="medication-analysis-stream", instructions=system_prompt, tools=[*bing_tool.definitions, *functions.definitions], headers={"x-ms-enable-preview": "true"} ) logger.info(f"Created agent with ID: {agent.id}") yield f"data: {json.dumps({'type': 'message', 'content': 'Agent created. Starting thread...'})}\n\n" # Create thread and initial message thread = project_client.agents.create_thread() message_content = f"Analyze the medication: {info.name}. {info.notes if info.notes else ''}" project_client.agents.create_message( thread_id=thread.id, role="user", content=message_content ) yield f"data: {json.dumps({'type': 'message', 'content': 'Thread created and message sent.'})}\n\n" # Create and start the run run = project_client.agents.create_run(thread_id=thread.id, agent_id=agent.id) logger.info(f"Created run with ID: {run.id}") yield f"data: {json.dumps({'type': 'message', 'content': 'Run initiated. Processing...'})}\n\n" # Poll for run completion max_retries = 60 retry_count = 0 while run.status in ["queued", "in_progress", "requires_action"]: await asyncio.sleep(1) run = project_client.agents.get_run(thread_id=thread.id, run_id=run.id) logger.info(f"Current run status: {run.status}") yield f"data: {json.dumps({'type': 'message', 'content': f'Run status: {run.status}'})}\n\n" retry_count += 1 if retry_count >= max_retries: yield f"data: {json.dumps({'type': 'error', 'content': 'Run timed out.'})}\n\n" return # Handle required function calls if any if run.status == "requires_action" and hasattr(run, "required_action") and isinstance(run.required_action, SubmitToolOutputsAction): tool_calls = run.required_action.submit_tool_outputs.tool_calls if not tool_calls: logger.error("No tool calls provided") break tool_outputs = [] for tool_call in tool_calls: if isinstance(tool_call, RequiredFunctionToolCall): try: output = functions.execute(tool_call) tool_outputs.append( ToolOutput( tool_call_id=tool_call.id, output=output ) ) yield f"data: {json.dumps({'type': 'message', 'content': f'Executed function call: {tool_call.function.name}'})}\n\n" except Exception as e: logger.error(f"Error executing tool call: {e}") if tool_outputs: run = project_client.agents.submit_tool_outputs_to_run( thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs ) continue # Once the run is complete, look for the assistant message if run.status == "failed": logger.error(f"Run failed: {run.last_error}") yield f"data: {json.dumps({'type': 'error', 'content': f'Run failed: {run.last_error}'})}\n\n" return messages = project_client.agents.list_messages(thread_id=thread.id) final_result = None for msg in reversed(messages.data): if msg.role == "assistant": # Loop through message content parts for content in msg.content: if hasattr(content, 'text') and content.text: text = content.text.value # If the response is formatted with markdown, extract JSON if text.startswith('```json'): text = text.split('```json')[1].split('```')[0].strip() try: final_result = json.loads(text) break except json.JSONDecodeError as e: logger.error(f"JSON decode error: {e}") if final_result: break if final_result is None: yield f"data: {json.dumps({'type': 'error', 'content': 'No valid response received from AI'})}\n\n" return # Yield final result as a completed message yield f"data: {json.dumps({'done': True, 'type': 'final', 'content': final_result})}\n\n" except Exception as ex: logger.error(f"Exception in streaming analysis: {ex}") yield f"data: {json.dumps({'type': 'error', 'content': str(ex)})}\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream")