server/app/utils.py (147 lines of code) (raw):

import base64 import json import numpy as np from agents import ( Agent, RunItemStreamEvent, RawResponsesStreamEvent, AgentUpdatedStreamEvent, ) from agents.voice import AudioInput, VoiceStreamEvent, VoiceStreamEventAudio from fastapi import WebSocket from openai.types.responses import ResponseTextDeltaEvent def transform_data_to_events(audio_np: np.ndarray) -> dict: return { "type": "response.audio.delta", "delta": base64.b64encode(audio_np.tobytes()).decode("utf-8"), "output_index": 0, "content_index": 0, "item_id": "", "response_id": "", "event_id": "", } def is_new_output_item(event): return isinstance(event, RunItemStreamEvent) def is_text_output(event): return event.type == "raw_response_event" and isinstance( event.data, ResponseTextDeltaEvent ) def is_sync_message(data): return data["type"] == "history.update" and ( not data["inputs"] or data["inputs"][-1].get("role") != "user" ) def is_new_text_message(data): return data["type"] == "history.update" and ( data["inputs"] and data["inputs"][-1].get("role") == "user" ) def process_inputs(data, connection) -> str: connection.history = data["inputs"][:-1] return data["inputs"][-1]["content"] def is_new_audio_chunk(data): return data["type"] == "input_audio_buffer.append" def is_audio_complete(data): return data["type"] == "input_audio_buffer.commit" def extract_audio_chunk(data): decoded_bytes = base64.b64decode(data["delta"]) audio_int16 = np.frombuffer(decoded_bytes, dtype=np.int16) audio_data = audio_int16.astype(np.float32) / 32768.0 return audio_data def concat_audio_chunks(chunks) -> AudioInput: return AudioInput(np.concatenate(chunks)) class WebsocketHelper: def __init__(self, websocket: WebSocket, history: list, initial_agent: Agent): self.websocket = websocket self.history = history or [] self.latest_agent = initial_agent self.partial_response = "" async def show_user_input(self, user_input: str): self.history.append( { "type": "message", "role": "user", "content": user_input, } ) await self.websocket.send_text( json.dumps( { "type": "history.updated", "reason": "user.input", "inputs": self.history, "agent_name": self.latest_agent.name, } ) ) return (self.history, self.latest_agent) async def stream_response(self, new_tokens: str, is_text: bool = False): if is_text: return self.partial_response += new_tokens await self.websocket.send_text( json.dumps( { "type": "history.updated", "reason": "response.text.delta", "inputs": self.history + [ { "type": "message", "role": "assistant", "content": self.partial_response, } ], "agent_name": self.latest_agent.name, } ) ) async def handle_new_item( self, event: RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent, ): if is_new_output_item(event): self.history.append(event.item.to_input_item()) # type: ignore await self.websocket.send_text( json.dumps( { "type": "history.updated", "reason": "response.input_item", "inputs": self.history, "agent_name": self.latest_agent.name, } ) ) elif is_text_output(event): await self.stream_response(event.data.delta) # type: ignore async def text_output_complete(self, output, is_done=False): if not is_done: await self.websocket.send_text( json.dumps( { "type": "history.updated", "inputs": self.history, "sync": True, "agent_name": self.latest_agent.name, } ) ) else: self.partial_response = "" self.latest_agent = output.last_agent self.history = output.to_input_list() await self.websocket.send_text( json.dumps( { "type": "history.updated", "inputs": self.history, "reason": "response.done", "agent_name": self.latest_agent.name, } ) ) async def send_audio_chunk(self, event: VoiceStreamEvent): if isinstance(event, VoiceStreamEventAudio): await self.websocket.send_text( json.dumps(transform_data_to_events(event.data)) # type: ignore ) async def send_audio_done(self): await self.websocket.send_text(json.dumps({"type": "audio.done"}))