3-ai-native-e2e-sample/backend/agents/literature.py (106 lines of code) (raw):
from azure.ai.projects.models import AgentEventHandler, MessageDeltaChunk, ThreadMessage, ThreadRun, RunStep, RunStatus, RunStepType, RunStepStatus
from typing import Any, Generator, Dict, Union
import logging
import json
logger = logging.getLogger(__name__)
class LiteratureChatHandler(AgentEventHandler):
"""Event handler for streaming literature chat responses."""
def __init__(self):
super().__init__()
self.current_run_status = None
def on_message_delta(self, delta: MessageDeltaChunk) -> Generator[Union[str, Dict[str, Any]], None, None]:
"""Handle streaming message chunks."""
if delta.text:
logger.debug(f"Received message delta: {delta.text[:100]}...")
return json.dumps({
"type": "delta",
"content": delta.text
})
return None
def on_thread_message(self, message: ThreadMessage) -> Generator[Dict[str, Any], None, None]:
"""Handle complete thread messages."""
if message.content and len(message.content) > 0:
logger.info(f"Received thread message: {message.id}")
try:
# Handle MessageTextDetails object properly
if hasattr(message.content, 'value'):
content = message.content.value
else:
content = str(message.content)
if content.strip(): # Only send if there's actual content
return json.dumps({
"type": "message",
"content": content
})
except Exception as e:
logger.error(f"Error processing message content: {str(e)}")
return json.dumps({"error": str(e)})
return None
def on_thread_run(self, run: ThreadRun) -> None:
"""Handle thread run status updates."""
logger.info(f"Thread run status: {run.status}")
def on_run_step(self, step: RunStep) -> None:
"""Handle individual run steps."""
logger.info(f"Run step type: {step.type}, Status: {step.status}")
def on_run_status_changed(self, event_data):
"""Handle run status changed events"""
status = event_data.get("status")
self.current_run_status = status
logger.info(f"Thread run status: {status}")
if status == RunStatus.FAILED:
return json.dumps({
"type": "error",
"content": "The assistant encountered an error processing your request."
})
return None
def on_error(self, data: str) -> Generator[Dict[str, Any], None, None]:
"""Handle error events."""
error_msg = f"Error in literature chat: {data}"
logger.error(error_msg)
return json.dumps({
"type": "error",
"content": error_msg
})
def on_done(self) -> Generator[Dict[str, Any], None, None]:
"""Handle stream completion."""
logger.info("Literature chat stream completed")
if self.current_run_status == RunStatus.FAILED:
return json.dumps({
"type": "error",
"content": "The conversation ended with an error."
})
return json.dumps({"done": True})
def on_unhandled_event(self, event_type: str, event_data: Any) -> None:
"""Handle any unrecognized events."""
logger.warning(f"Unhandled event type: {event_type}, Data: {event_data}")
def on_run_step_started(self, event_data):
"""Handle run step started events"""
step_type = event_data.get("step_type")
logger.info(f"Run step type: {step_type}, Status: RunStepStatus.IN_PROGRESS")
return None
def on_run_step_completed(self, event_data):
"""Handle run step completed events"""
step_type = event_data.get("step_type")
logger.info(f"Run step type: {step_type}, Status: RunStepStatus.COMPLETED")
return None
def __call__(self, run_status=None, run_step=None, message=None):
try:
if run_status:
logger.info(f"Thread run status: {run_status}")
if run_step:
logger.info(f"Run step type: {run_step.type}, Status: {run_step.status}")
if message:
logger.info(f"Received thread message: {message.id}")
if message.content:
try:
# Handle MessageTextDetails object properly
if hasattr(message.content, 'value'):
content = message.content.value
else:
content = str(message.content)
# Return properly formatted JSON response
return None, None, json.dumps({
"type": "message",
"content": content
})
except Exception as e:
logger.error(f"Error processing message content: {str(e)}")
return None, None, json.dumps({
"type": "error",
"content": f"Failed to process message: {str(e)}"
})
if run_status == RunStatus.COMPLETED:
logger.info("Literature chat stream completed")
except Exception as e:
logger.error(f"Error in literature chat handler: {str(e)}")
return None, None, json.dumps({
"type": "error",
"content": f"Handler error: {str(e)}"
})
return None, None, None