codex-rs/core/src/client_common.rs (64 lines of code) (raw):
pub use codex_api::ResponseEvent;
use codex_config::types::Personality;
use codex_protocol::error::Result;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InterAgentCommunication;
use codex_tools::ToolSpec;
use futures::Stream;
use serde_json::Value;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
/// API request payload for a single model turn
#[derive(Debug, Clone)]
pub struct Prompt {
/// Conversation context input items.
pub input: Vec<ResponseItem>,
/// Tools available to the model, including additional tools sourced from
/// external MCP servers.
pub(crate) tools: Vec<ToolSpec>,
/// Whether parallel tool calls are permitted for this prompt.
pub(crate) parallel_tool_calls: bool,
pub base_instructions: BaseInstructions,
/// Optionally specify the personality of the model.
pub personality: Option<Personality>,
/// Optional the output schema for the model's response.
pub output_schema: Option<Value>,
/// Whether the Responses API should strictly validate `output_schema`.
pub output_schema_strict: bool,
}
impl Default for Prompt {
fn default() -> Self {
Self {
input: Vec::new(),
tools: Vec::new(),
parallel_tool_calls: false,
base_instructions: BaseInstructions::default(),
personality: None,
output_schema: None,
output_schema_strict: true,
}
}
}
impl Prompt {
pub(crate) fn get_formatted_input(&self) -> Vec<ResponseItem> {
self.input
.iter()
.cloned()
.map(|item| {
let ResponseItem::Message { role, content, .. } = &item else {
return item;
};
if role != "assistant" {
return item;
}
InterAgentCommunication::from_message_content(content)
.filter(|communication| communication.encrypted_content.is_some())
.map(|communication| communication.to_model_input_item())
.unwrap_or(item)
})
.collect()
}
}
pub struct ResponseStream {
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
/// Signals the mapper task that the consumer stopped polling before the
/// provider stream reached its own terminal event.
pub(crate) consumer_dropped: CancellationToken,
}
impl Stream for ResponseStream {
type Item = Result<ResponseEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx_event.poll_recv(cx)
}
}
impl Drop for ResponseStream {
fn drop(&mut self) {
self.consumer_dropped.cancel();
}
}
#[cfg(test)]
#[path = "client_common_tests.rs"]
mod tests;