fn apply_event()

in codex-rs/rollout-trace/src/reducer/mod.rs [149:480]


    fn apply_event(&mut self, event: RawTraceEvent) -> Result<()> {
        // Raw payload refs are reducer-wide evidence, not owned by a single
        // semantic arm. Keep this bookkeeping separate so typed reduction can
        // stay strict without duplicating payload insertion in every case.
        for payload in event.payload.raw_payload_refs() {
            self.insert_raw_payload(payload);
        }

        match event.payload {
            RawTraceEventPayload::RolloutStarted {
                trace_id,
                root_thread_id,
            } => {
                self.rollout.trace_id = trace_id;
                self.rollout.root_thread_id = root_thread_id;
            }
            RawTraceEventPayload::RolloutEnded { status } => {
                self.rollout.status = status;
                self.rollout.ended_at_unix_ms = Some(event.wall_time_unix_ms);
            }
            RawTraceEventPayload::ThreadStarted {
                thread_id,
                agent_path,
                metadata_payload,
            } => {
                self.start_thread(
                    event.seq,
                    event.wall_time_unix_ms,
                    thread_id,
                    agent_path,
                    metadata_payload,
                )?;
            }
            RawTraceEventPayload::ThreadEnded { thread_id, status } => {
                self.end_thread(event.seq, event.wall_time_unix_ms, thread_id, status)?;
            }
            RawTraceEventPayload::CodexTurnStarted {
                codex_turn_id,
                thread_id,
            } => {
                self.start_codex_turn(
                    event.seq,
                    event.wall_time_unix_ms,
                    codex_turn_id,
                    thread_id,
                )?;
            }
            RawTraceEventPayload::CodexTurnEnded {
                codex_turn_id,
                status,
            } => {
                self.end_codex_turn(
                    event.seq,
                    event.wall_time_unix_ms,
                    event.thread_id,
                    codex_turn_id,
                    status,
                )?;
            }
            RawTraceEventPayload::InferenceStarted {
                inference_call_id,
                thread_id,
                codex_turn_id,
                model,
                provider_name,
                request_payload,
            } => {
                self.start_inference_call(
                    event.seq,
                    event.wall_time_unix_ms,
                    StartedInferenceCall {
                        inference_call_id,
                        thread_id,
                        codex_turn_id,
                        model,
                        provider_name,
                        request_payload,
                    },
                )?;
            }
            payload @ (RawTraceEventPayload::InferenceCompleted { .. }
            | RawTraceEventPayload::InferenceFailed { .. }
            | RawTraceEventPayload::InferenceCancelled { .. }) => {
                self.complete_inference_call(event.seq, event.wall_time_unix_ms, payload)?;
            }
            RawTraceEventPayload::ProtocolEventObserved { .. } => {
                // Protocol wrappers are raw debug breadcrumbs. Typed hooks own
                // the reduced graph, so these payload refs are retained without
                // creating semantic objects.
            }
            RawTraceEventPayload::ToolCallStarted {
                tool_call_id,
                model_visible_call_id,
                code_mode_runtime_tool_id,
                requester,
                kind,
                summary,
                invocation_payload,
            } => {
                self.start_tool_call(
                    event.seq,
                    event.wall_time_unix_ms,
                    event.thread_id,
                    event.codex_turn_id,
                    ToolCallStarted {
                        tool_call_id,
                        model_visible_call_id,
                        code_mode_runtime_tool_id,
                        requester,
                        kind,
                        summary,
                        invocation_payload,
                    },
                )?;
            }
            RawTraceEventPayload::McpToolCallCorrelationAssigned {
                tool_call_id,
                mcp_call_id,
            } => {
                self.assign_mcp_tool_call_correlation(tool_call_id, mcp_call_id)?;
            }
            RawTraceEventPayload::ToolCallRuntimeStarted {
                tool_call_id,
                runtime_payload,
            } => {
                self.start_tool_runtime_observation(
                    event.seq,
                    event.wall_time_unix_ms,
                    tool_call_id,
                    runtime_payload,
                )?;
            }
            RawTraceEventPayload::ToolCallRuntimeEnded {
                tool_call_id,
                status,
                runtime_payload,
            } => {
                self.end_tool_runtime_observation(
                    event.seq,
                    event.wall_time_unix_ms,
                    tool_call_id,
                    status,
                    runtime_payload,
                )?;
            }
            RawTraceEventPayload::ToolCallEnded {
                tool_call_id,
                status,
                result_payload,
            } => {
                self.end_tool_call(
                    event.seq,
                    event.wall_time_unix_ms,
                    tool_call_id,
                    status,
                    result_payload,
                )?;
            }
            RawTraceEventPayload::CodeCellStarted {
                runtime_cell_id,
                model_visible_call_id,
                source_js,
            } => {
                let thread_id = self.code_cell_event_thread_id(
                    event.thread_id,
                    event.codex_turn_id.as_deref(),
                    &runtime_cell_id,
                    "code cell start",
                )?;
                let reduced_code_cell_id =
                    self.reduced_code_cell_id_for_model_visible_call(&model_visible_call_id);
                self.record_runtime_code_cell_id(
                    &thread_id,
                    &runtime_cell_id,
                    &reduced_code_cell_id,
                )?;
                self.start_or_queue_code_cell(PendingCodeCellStart {
                    seq: event.seq,
                    wall_time_unix_ms: event.wall_time_unix_ms,
                    thread_id,
                    codex_turn_id: event.codex_turn_id,
                    started: StartedCodeCell {
                        code_cell_id: reduced_code_cell_id,
                        runtime_cell_id,
                        model_visible_call_id,
                        source_js,
                    },
                })?;
            }
            RawTraceEventPayload::CodeCellInitialResponse {
                runtime_cell_id,
                status,
                ..
            } => {
                let thread_id = self.code_cell_event_thread_id(
                    event.thread_id,
                    event.codex_turn_id.as_deref(),
                    &runtime_cell_id,
                    "code cell initial response",
                )?;
                let code_cell_id = self.code_cell_id_for_runtime_cell_id(
                    &thread_id,
                    &runtime_cell_id,
                    "code cell initial response",
                )?;
                self.record_or_queue_code_cell_initial_response(
                    event.seq,
                    event.wall_time_unix_ms,
                    code_cell_id,
                    runtime_cell_id,
                    status,
                )?;
            }
            RawTraceEventPayload::CodeCellEnded {
                runtime_cell_id,
                status,
                ..
            } => {
                let thread_id = self.code_cell_event_thread_id(
                    event.thread_id,
                    event.codex_turn_id.as_deref(),
                    &runtime_cell_id,
                    "code cell end",
                )?;
                let code_cell_id = self.code_cell_id_for_runtime_cell_id(
                    &thread_id,
                    &runtime_cell_id,
                    "code cell end",
                )?;
                self.end_or_queue_code_cell(
                    event.seq,
                    event.wall_time_unix_ms,
                    code_cell_id,
                    status,
                )?;
            }
            RawTraceEventPayload::CompactionRequestStarted {
                compaction_id,
                compaction_request_id,
                thread_id,
                codex_turn_id,
                model,
                provider_name,
                request_payload,
            } => {
                self.start_compaction_request(
                    event.seq,
                    event.wall_time_unix_ms,
                    StartedCompactionRequest {
                        compaction_id,
                        compaction_request_id,
                        thread_id,
                        codex_turn_id,
                        model,
                        provider_name,
                        request_payload,
                    },
                )?;
            }
            RawTraceEventPayload::CompactionRequestCompleted {
                compaction_id,
                compaction_request_id,
                response_payload,
            } => {
                self.complete_compaction_request(
                    event.seq,
                    event.wall_time_unix_ms,
                    compaction_id,
                    compaction_request_id,
                    ExecutionStatus::Completed,
                    Some(response_payload),
                )?;
            }
            RawTraceEventPayload::CompactionRequestFailed {
                compaction_id,
                compaction_request_id,
                ..
            } => {
                self.complete_compaction_request(
                    event.seq,
                    event.wall_time_unix_ms,
                    compaction_id,
                    compaction_request_id,
                    ExecutionStatus::Failed,
                    /*response_payload*/ None,
                )?;
            }
            RawTraceEventPayload::CompactionInstalled {
                compaction_id,
                checkpoint_payload,
            } => {
                let Some(thread_id) = event.thread_id else {
                    bail!("compaction installed event {compaction_id} did not include a thread id");
                };
                let Some(codex_turn_id) = event.codex_turn_id else {
                    bail!(
                        "compaction installed event {compaction_id} did not include a codex turn id"
                    );
                };
                self.reduce_compaction_installed_event(
                    event.wall_time_unix_ms,
                    thread_id,
                    codex_turn_id,
                    compaction_id,
                    checkpoint_payload,
                )?;
            }
            RawTraceEventPayload::AgentResultObserved {
                edge_id,
                child_thread_id,
                child_codex_turn_id,
                parent_thread_id,
                message,
                carried_payload,
            } => {
                self.queue_agent_result_interaction_edge(ObservedAgentResultEdge {
                    wall_time_unix_ms: event.wall_time_unix_ms,
                    edge_id,
                    child_thread_id,
                    child_codex_turn_id,
                    parent_thread_id,
                    message,
                    carried_payload,
                })?;
            }
            RawTraceEventPayload::Other { .. } => {
                bail!("raw trace event has no reducer implementation");
            }
        }

        Ok(())
    }