3-ai-native-e2e-sample/backend/agents/trials/event_consumer/consumer.py (58 lines of code) (raw):

""" Trial Events Consumer This module consumes trial events from Azure Event Hubs and passes them to the multi-agent coordinator for analysis. The consumer bridges the gap between event generation and agent processing. """ import asyncio import json import logging from datetime import datetime from typing import Dict, Any, Optional from azure.eventhub.aio import EventHubConsumerClient from opentelemetry import trace from utils.telemetry import tracer import os from .coordinator import TrialAgentCoordinator from config import EVENT_HUBS_CONFIG logger = logging.getLogger(__name__) class TrialEventsConsumer: """Consumes trial events and processes them through the multi-agent system.""" def __init__(self, coordinator: TrialAgentCoordinator): """ Initialize the consumer with a coordinator. The coordinator processes the event with its team-led agent system, integrating telemetry and aggregated analysis. """ self.coordinator = coordinator self.consumer = EventHubConsumerClient.from_connection_string( conn_str=EVENT_HUBS_CONFIG["connection_string"], consumer_group=EVENT_HUBS_CONFIG["consumer_group"], eventhub_name=EVENT_HUBS_CONFIG["eventhub_name"] ) logger.info("✅ Trial events consumer initialized") async def process_event(self, event: Dict[str, Any]) -> None: """ Process a single trial event using the multi-agent system. This method extracts the trial event payload and hands it over to the coordinator for detailed analysis, connecting simulation with agent-based interpretation. """ with tracer.start_as_current_span("process_trial_event") as span: try: trial_id = event.get("trialId") span.set_attribute("trial.id", trial_id) logger.info("🔄 Processing trial event: %s", trial_id) analysis = await self.coordinator.process_trial_event(event) logger.info("✅ Analysis completed for trial: %s", trial_id) logger.debug("Analysis results: %s", analysis) span.set_attribute("analysis.completed", True) except Exception as e: logger.error("❌ Error processing trial event: %s", str(e), exc_info=True) span.set_status(trace.Status(trace.StatusCode.ERROR)) span.record_exception(e) raise async def start_receiving(self) -> None: """Start receiving events from Event Hub and processing them.""" with tracer.start_as_current_span("receive_trial_events") as span: try: logger.info("🎯 Starting trial event consumer") async with self.consumer: async def on_event(partition_context, event): # Extract event data and forward for processing. event_data = json.loads(event.body_as_str()) await self.process_event(event_data) await partition_context.update_checkpoint(event) await self.consumer.receive( on_event=on_event, starting_position="-1" # Start from end ) except Exception as e: logger.error("❌ Error in event consumer: %s", str(e), exc_info=True) span.set_status(trace.Status(trace.StatusCode.ERROR)) span.record_exception(e) raise def close(self) -> None: """Close the consumer client.""" if self.consumer: logger.info("🛑 Closing trial event consumer") self.consumer.close()