3-ai-native-e2e-sample/backend/routers/literature.py (89 lines of code) (raw):
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import AzureAISearchTool, ConnectionType
from azure.identity import DefaultAzureCredential
from agents.literature import LiteratureChatHandler
import os
import logging
import json
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/literature-chat")
async def chat_literature(request: Request):
"""
Stream chat responses about literature using AI Search.
Args:
request: The request object containing the user's chat message
Returns:
StreamingResponse: Server-sent events stream of chat responses
Raises:
HTTPException: If there's an error processing the request
"""
try:
body = await request.json()
message = body.get("message")
if not message:
raise HTTPException(status_code=400, detail="Message field is required")
# Initialize AI Project client
project_client = AIProjectClient.from_connection_string(
credential=DefaultAzureCredential(),
conn_str=os.environ["PROJECT_CONNECTION_STRING"]
)
# Get AI Search connection
search_conn = project_client.connections.get_default(
connection_type=ConnectionType.AZURE_AI_SEARCH,
include_credentials=True
)
if not search_conn:
raise ValueError("No default Azure AI Search connection found")
# Configure AI Search tool
ai_search_tool = AzureAISearchTool(
index_connection_id=search_conn.id,
index_name="literature-index"
)
# Create chat agent
agent = project_client.agents.create_agent(
model=os.environ["MODEL_DEPLOYMENT_NAME"],
name="literature-chat",
instructions="""You are a Literature Research Assistant. Help users find and understand scientific literature.
Use the search tool to find relevant papers and provide evidence-based responses.
Always cite your sources and provide context for your answers.""",
tools=ai_search_tool.definitions,
tool_resources=ai_search_tool.resources,
headers={"x-ms-enable-preview": "true"}
)
logger.info(f"Created agent with ID: {agent.id}")
# Create thread and message
thread = project_client.agents.create_thread()
logger.info(f"Created thread with ID: {thread.id}")
message_obj = project_client.agents.create_message(
thread_id=thread.id,
role="user",
content=message
)
logger.info(f"Created message with ID: {message_obj.id}")
# Create streaming response
stream = project_client.agents.create_stream(
thread_id=thread.id,
agent_id=agent.id,
event_handler=LiteratureChatHandler()
)
async def generate_events():
try:
with stream as active_stream:
for event in active_stream:
if isinstance(event, tuple) and len(event) == 3:
_, _, response = event
if response:
yield f"data: {response}\n\n"
# Check if this was an error response
try:
resp_data = json.loads(response)
if resp_data.get("type") == "error":
logger.error(f"Stream error: {resp_data.get('content')}")
break
except:
pass
except Exception as e:
logger.error(f"Stream error: {str(e)}")
error_msg = json.dumps({
"type": "error",
"content": str(e)
})
yield f"data: {error_msg}\n\n"
finally:
yield "data: {\"done\": true}\n\n"
return StreamingResponse(
generate_events(),
media_type="text/event-stream"
)
except Exception as e:
logger.error(f"Error in literature chat: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to process literature chat: {str(e)}"
)