codex-rs/core/src/client_common.rs (64 lines of code) (raw):

pub use codex_api::ResponseEvent; use codex_config::types::Personality; use codex_protocol::error::Result; use codex_protocol::models::BaseInstructions; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InterAgentCommunication; use codex_tools::ToolSpec; use futures::Stream; use serde_json::Value; use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; /// API request payload for a single model turn #[derive(Debug, Clone)] pub struct Prompt { /// Conversation context input items. pub input: Vec<ResponseItem>, /// Tools available to the model, including additional tools sourced from /// external MCP servers. pub(crate) tools: Vec<ToolSpec>, /// Whether parallel tool calls are permitted for this prompt. pub(crate) parallel_tool_calls: bool, pub base_instructions: BaseInstructions, /// Optionally specify the personality of the model. pub personality: Option<Personality>, /// Optional the output schema for the model's response. pub output_schema: Option<Value>, /// Whether the Responses API should strictly validate `output_schema`. pub output_schema_strict: bool, } impl Default for Prompt { fn default() -> Self { Self { input: Vec::new(), tools: Vec::new(), parallel_tool_calls: false, base_instructions: BaseInstructions::default(), personality: None, output_schema: None, output_schema_strict: true, } } } impl Prompt { pub(crate) fn get_formatted_input(&self) -> Vec<ResponseItem> { self.input .iter() .cloned() .map(|item| { let ResponseItem::Message { role, content, .. } = &item else { return item; }; if role != "assistant" { return item; } InterAgentCommunication::from_message_content(content) .filter(|communication| communication.encrypted_content.is_some()) .map(|communication| communication.to_model_input_item()) .unwrap_or(item) }) .collect() } } pub struct ResponseStream { pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>, /// Signals the mapper task that the consumer stopped polling before the /// provider stream reached its own terminal event. pub(crate) consumer_dropped: CancellationToken, } impl Stream for ResponseStream { type Item = Result<ResponseEvent>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.rx_event.poll_recv(cx) } } impl Drop for ResponseStream { fn drop(&mut self) { self.consumer_dropped.cancel(); } } #[cfg(test)] #[path = "client_common_tests.rs"] mod tests;