in codex-rs/rollout-trace/src/reducer/mod.rs [149:480]
fn apply_event(&mut self, event: RawTraceEvent) -> Result<()> {
// Raw payload refs are reducer-wide evidence, not owned by a single
// semantic arm. Keep this bookkeeping separate so typed reduction can
// stay strict without duplicating payload insertion in every case.
for payload in event.payload.raw_payload_refs() {
self.insert_raw_payload(payload);
}
match event.payload {
RawTraceEventPayload::RolloutStarted {
trace_id,
root_thread_id,
} => {
self.rollout.trace_id = trace_id;
self.rollout.root_thread_id = root_thread_id;
}
RawTraceEventPayload::RolloutEnded { status } => {
self.rollout.status = status;
self.rollout.ended_at_unix_ms = Some(event.wall_time_unix_ms);
}
RawTraceEventPayload::ThreadStarted {
thread_id,
agent_path,
metadata_payload,
} => {
self.start_thread(
event.seq,
event.wall_time_unix_ms,
thread_id,
agent_path,
metadata_payload,
)?;
}
RawTraceEventPayload::ThreadEnded { thread_id, status } => {
self.end_thread(event.seq, event.wall_time_unix_ms, thread_id, status)?;
}
RawTraceEventPayload::CodexTurnStarted {
codex_turn_id,
thread_id,
} => {
self.start_codex_turn(
event.seq,
event.wall_time_unix_ms,
codex_turn_id,
thread_id,
)?;
}
RawTraceEventPayload::CodexTurnEnded {
codex_turn_id,
status,
} => {
self.end_codex_turn(
event.seq,
event.wall_time_unix_ms,
event.thread_id,
codex_turn_id,
status,
)?;
}
RawTraceEventPayload::InferenceStarted {
inference_call_id,
thread_id,
codex_turn_id,
model,
provider_name,
request_payload,
} => {
self.start_inference_call(
event.seq,
event.wall_time_unix_ms,
StartedInferenceCall {
inference_call_id,
thread_id,
codex_turn_id,
model,
provider_name,
request_payload,
},
)?;
}
payload @ (RawTraceEventPayload::InferenceCompleted { .. }
| RawTraceEventPayload::InferenceFailed { .. }
| RawTraceEventPayload::InferenceCancelled { .. }) => {
self.complete_inference_call(event.seq, event.wall_time_unix_ms, payload)?;
}
RawTraceEventPayload::ProtocolEventObserved { .. } => {
// Protocol wrappers are raw debug breadcrumbs. Typed hooks own
// the reduced graph, so these payload refs are retained without
// creating semantic objects.
}
RawTraceEventPayload::ToolCallStarted {
tool_call_id,
model_visible_call_id,
code_mode_runtime_tool_id,
requester,
kind,
summary,
invocation_payload,
} => {
self.start_tool_call(
event.seq,
event.wall_time_unix_ms,
event.thread_id,
event.codex_turn_id,
ToolCallStarted {
tool_call_id,
model_visible_call_id,
code_mode_runtime_tool_id,
requester,
kind,
summary,
invocation_payload,
},
)?;
}
RawTraceEventPayload::McpToolCallCorrelationAssigned {
tool_call_id,
mcp_call_id,
} => {
self.assign_mcp_tool_call_correlation(tool_call_id, mcp_call_id)?;
}
RawTraceEventPayload::ToolCallRuntimeStarted {
tool_call_id,
runtime_payload,
} => {
self.start_tool_runtime_observation(
event.seq,
event.wall_time_unix_ms,
tool_call_id,
runtime_payload,
)?;
}
RawTraceEventPayload::ToolCallRuntimeEnded {
tool_call_id,
status,
runtime_payload,
} => {
self.end_tool_runtime_observation(
event.seq,
event.wall_time_unix_ms,
tool_call_id,
status,
runtime_payload,
)?;
}
RawTraceEventPayload::ToolCallEnded {
tool_call_id,
status,
result_payload,
} => {
self.end_tool_call(
event.seq,
event.wall_time_unix_ms,
tool_call_id,
status,
result_payload,
)?;
}
RawTraceEventPayload::CodeCellStarted {
runtime_cell_id,
model_visible_call_id,
source_js,
} => {
let thread_id = self.code_cell_event_thread_id(
event.thread_id,
event.codex_turn_id.as_deref(),
&runtime_cell_id,
"code cell start",
)?;
let reduced_code_cell_id =
self.reduced_code_cell_id_for_model_visible_call(&model_visible_call_id);
self.record_runtime_code_cell_id(
&thread_id,
&runtime_cell_id,
&reduced_code_cell_id,
)?;
self.start_or_queue_code_cell(PendingCodeCellStart {
seq: event.seq,
wall_time_unix_ms: event.wall_time_unix_ms,
thread_id,
codex_turn_id: event.codex_turn_id,
started: StartedCodeCell {
code_cell_id: reduced_code_cell_id,
runtime_cell_id,
model_visible_call_id,
source_js,
},
})?;
}
RawTraceEventPayload::CodeCellInitialResponse {
runtime_cell_id,
status,
..
} => {
let thread_id = self.code_cell_event_thread_id(
event.thread_id,
event.codex_turn_id.as_deref(),
&runtime_cell_id,
"code cell initial response",
)?;
let code_cell_id = self.code_cell_id_for_runtime_cell_id(
&thread_id,
&runtime_cell_id,
"code cell initial response",
)?;
self.record_or_queue_code_cell_initial_response(
event.seq,
event.wall_time_unix_ms,
code_cell_id,
runtime_cell_id,
status,
)?;
}
RawTraceEventPayload::CodeCellEnded {
runtime_cell_id,
status,
..
} => {
let thread_id = self.code_cell_event_thread_id(
event.thread_id,
event.codex_turn_id.as_deref(),
&runtime_cell_id,
"code cell end",
)?;
let code_cell_id = self.code_cell_id_for_runtime_cell_id(
&thread_id,
&runtime_cell_id,
"code cell end",
)?;
self.end_or_queue_code_cell(
event.seq,
event.wall_time_unix_ms,
code_cell_id,
status,
)?;
}
RawTraceEventPayload::CompactionRequestStarted {
compaction_id,
compaction_request_id,
thread_id,
codex_turn_id,
model,
provider_name,
request_payload,
} => {
self.start_compaction_request(
event.seq,
event.wall_time_unix_ms,
StartedCompactionRequest {
compaction_id,
compaction_request_id,
thread_id,
codex_turn_id,
model,
provider_name,
request_payload,
},
)?;
}
RawTraceEventPayload::CompactionRequestCompleted {
compaction_id,
compaction_request_id,
response_payload,
} => {
self.complete_compaction_request(
event.seq,
event.wall_time_unix_ms,
compaction_id,
compaction_request_id,
ExecutionStatus::Completed,
Some(response_payload),
)?;
}
RawTraceEventPayload::CompactionRequestFailed {
compaction_id,
compaction_request_id,
..
} => {
self.complete_compaction_request(
event.seq,
event.wall_time_unix_ms,
compaction_id,
compaction_request_id,
ExecutionStatus::Failed,
/*response_payload*/ None,
)?;
}
RawTraceEventPayload::CompactionInstalled {
compaction_id,
checkpoint_payload,
} => {
let Some(thread_id) = event.thread_id else {
bail!("compaction installed event {compaction_id} did not include a thread id");
};
let Some(codex_turn_id) = event.codex_turn_id else {
bail!(
"compaction installed event {compaction_id} did not include a codex turn id"
);
};
self.reduce_compaction_installed_event(
event.wall_time_unix_ms,
thread_id,
codex_turn_id,
compaction_id,
checkpoint_payload,
)?;
}
RawTraceEventPayload::AgentResultObserved {
edge_id,
child_thread_id,
child_codex_turn_id,
parent_thread_id,
message,
carried_payload,
} => {
self.queue_agent_result_interaction_edge(ObservedAgentResultEdge {
wall_time_unix_ms: event.wall_time_unix_ms,
edge_id,
child_thread_id,
child_codex_turn_id,
parent_thread_id,
message,
carried_payload,
})?;
}
RawTraceEventPayload::Other { .. } => {
bail!("raw trace event has no reducer implementation");
}
}
Ok(())
}