in codex-rs/core/src/client.rs [1759:1904]
fn map_response_events<S>(
upstream_request_id: Option<String>,
api_stream: S,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
+ Unpin
+ Send
+ 'static,
{
let (tx_event, rx_event) =
mpsc::channel::<Result<ResponseEvent>>(RESPONSE_STREAM_CHANNEL_CAPACITY);
let (tx_last_response, rx_last_response) = oneshot::channel::<LastResponse>();
let consumer_dropped = CancellationToken::new();
let consumer_dropped_for_stream = consumer_dropped.clone();
tokio::spawn(async move {
let mut logged_error = false;
let mut tx_last_response = Some(tx_last_response);
let mut items_added: Vec<ResponseItem> = Vec::new();
let mut api_stream = api_stream;
let upstream_request_id = upstream_request_id.as_deref();
if let Some(upstream_request_id) = upstream_request_id {
feedback_tags!(last_model_request_id = upstream_request_id);
}
loop {
let event = tokio::select! {
_ = consumer_dropped.cancelled() => {
inference_trace_attempt.record_cancelled(
STREAM_DROPPED_REASON,
upstream_request_id,
&items_added,
);
return;
}
event = api_stream.next() => event,
};
let Some(event) = event else {
break;
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
items_added.push(item.clone());
if tx_event
.send(Ok(ResponseEvent::OutputItemDone(item)))
.await
.is_err()
{
inference_trace_attempt.record_cancelled(
STREAM_DROPPED_REASON,
upstream_request_id,
&items_added,
);
return;
}
}
Ok(ResponseEvent::Completed {
response_id,
token_usage,
end_turn,
}) => {
feedback_tags!(last_model_response_id = &response_id);
if let Some(usage) = &token_usage {
session_telemetry.sse_event_completed(
usage.input_tokens,
usage.output_tokens,
Some(usage.cached_input_tokens),
Some(usage.reasoning_output_tokens),
usage.total_tokens,
);
}
inference_trace_attempt.record_completed(
&response_id,
upstream_request_id,
&token_usage,
&items_added,
);
if let Some(sender) = tx_last_response.take() {
let _ = sender.send(LastResponse {
response_id: response_id.clone(),
items_added: std::mem::take(&mut items_added),
});
}
if tx_event
.send(Ok(ResponseEvent::Completed {
response_id,
token_usage,
end_turn,
}))
.await
.is_err()
{
return;
}
}
Ok(event) => {
if tx_event.send(Ok(event)).await.is_err() {
inference_trace_attempt.record_cancelled(
STREAM_DROPPED_REASON,
upstream_request_id,
&items_added,
);
return;
}
}
Err(err) => {
let response_debug_context =
extract_response_debug_context_from_api_error(&err);
let upstream_request_id =
upstream_request_id.or(response_debug_context.request_id.as_deref());
if let Some(upstream_request_id) = upstream_request_id {
feedback_tags!(last_model_request_id = upstream_request_id);
}
let mapped = map_api_error(err);
inference_trace_attempt.record_failed(
&mapped,
upstream_request_id,
&items_added,
);
if !logged_error {
session_telemetry.see_event_completed_failed(&mapped);
logged_error = true;
}
if tx_event.send(Err(mapped)).await.is_err() {
return;
}
}
}
}
inference_trace_attempt.record_failed(
"stream closed before response.completed",
upstream_request_id,
&items_added,
);
});
(
ResponseStream {
rx_event,
consumer_dropped: consumer_dropped_for_stream,
},
rx_last_response,
)
}