fn parity_check_from_scan_and_rows()

in codex-rs/cli/src/doctor/thread_inventory.rs [212:421]


fn parity_check_from_scan_and_rows(
    codex_home: &Path,
    scan: RolloutScan,
    rows: Vec<ThreadStateAuditRow>,
    mut details: Vec<String>,
) -> DoctorCheck {
    let rollout_by_key = scan
        .files
        .iter()
        .map(|file| (file.key.clone(), file))
        .collect::<HashMap<_, _>>();
    let mut rows_by_key: HashMap<PathBuf, Vec<&ThreadStateAuditRow>> = HashMap::new();
    for row in &rows {
        rows_by_key
            .entry(path_key(&row.rollout_path))
            .or_default()
            .push(row);
    }

    let missing_active = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ false);
    let missing_archived = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ true);
    let scan_complete = !scan.reached_scan_cap;
    let stale_rows = if scan_complete {
        rows.iter()
            .filter(|row| !row.rollout_path.is_file())
            .collect::<Vec<_>>()
    } else {
        Vec::new()
    };
    let archive_mismatches = if scan_complete {
        rows.iter()
            .filter_map(|row| {
                let expected_archived = rollout_by_key
                    .get(&path_key(&row.rollout_path))
                    .map(|file| file.archived)
                    .or_else(|| {
                        row.rollout_path
                            .is_file()
                            .then(|| archived_from_rollout_path(codex_home, &row.rollout_path))
                            .flatten()
                    })?;
                (expected_archived != row.archived).then_some(row)
            })
            .collect::<Vec<_>>()
    } else {
        Vec::new()
    };
    let duplicate_rollout_thread_ids = duplicate_rollout_thread_ids(&scan.files);
    let duplicate_db_paths = duplicate_db_paths(&rows_by_key);
    let archived_rows = rows.iter().filter(|row| row.archived).count();
    let active_rows = rows.len() - archived_rows;

    details.extend([
        format!("rollout DB rows: {}", rows.len()),
        format!("rollout DB active rows: {active_rows}"),
        format!("rollout DB archived rows: {archived_rows}"),
        format!("rollout DB missing active rows: {}", missing_active.len()),
        format!(
            "rollout DB missing archived rows: {}",
            missing_archived.len()
        ),
        format!(
            "rollout DB stale rows: {}",
            count_or_skipped(stale_rows.len(), scan_complete)
        ),
        format!(
            "rollout DB archive mismatches: {}",
            count_or_skipped(archive_mismatches.len(), scan_complete)
        ),
        format!(
            "rollout DB duplicate rollout thread ids: {}",
            duplicate_rollout_thread_ids.len()
        ),
        format!(
            "rollout DB duplicate DB paths: {}",
            duplicate_db_paths.len()
        ),
        format!(
            "rollout DB model providers: {}",
            count_summary(rows.iter().map(|row| row.model_provider.as_str()))
        ),
        format!(
            "rollout DB sources: {}",
            count_summary(rows.iter().map(|row| source_category(&row.source)))
        ),
    ]);
    push_path_samples(
        &mut details,
        "rollout DB missing active sample",
        missing_active.iter().copied(),
    );
    push_path_samples(
        &mut details,
        "rollout DB missing archived sample",
        missing_archived.iter().copied(),
    );
    push_path_samples(
        &mut details,
        "rollout DB stale row sample",
        stale_rows.iter().map(|row| row.rollout_path.as_path()),
    );
    push_path_samples(
        &mut details,
        "rollout DB archive mismatch sample",
        archive_mismatches
            .iter()
            .map(|row| row.rollout_path.as_path()),
    );
    push_samples(
        &mut details,
        "rollout DB duplicate rollout thread id sample",
        duplicate_rollout_thread_ids.iter().map(String::as_str),
    );
    push_path_samples(
        &mut details,
        "rollout DB duplicate DB path sample",
        duplicate_db_paths.iter().map(PathBuf::as_path),
    );

    let status = if scan.scan_errors.is_empty()
        && scan.malformed_names.is_empty()
        && !scan.reached_scan_cap
        && missing_active.is_empty()
        && missing_archived.is_empty()
        && stale_rows.is_empty()
        && archive_mismatches.is_empty()
        && duplicate_rollout_thread_ids.is_empty()
        && duplicate_db_paths.is_empty()
    {
        CheckStatus::Ok
    } else {
        CheckStatus::Warning
    };

    let summary = if status == CheckStatus::Ok {
        "rollout files and state DB thread inventory agree"
    } else {
        "rollout files and state DB thread inventory differ"
    };
    let mut check = DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, status, summary).details(details);

    if !missing_active.is_empty() || !missing_archived.is_empty() {
        check = check.issue(
            DoctorIssue::new(
                CheckStatus::Warning,
                "rollout files are missing from the state DB",
            )
            .measured(format!(
                "{} active, {} archived",
                missing_active.len(),
                missing_archived.len()
            ))
            .expected("every rollout file has a matching threads row"),
        );
    }
    if !stale_rows.is_empty() {
        check = check.issue(
            DoctorIssue::new(
                CheckStatus::Warning,
                "state DB rows point at missing or unusable rollout files",
            )
            .measured(format!("{} stale rows", stale_rows.len()))
            .expected("every state DB rollout path is a file on disk"),
        );
    }
    if !archive_mismatches.is_empty() {
        check = check.issue(
            DoctorIssue::new(
                CheckStatus::Warning,
                "state DB archive flags disagree with rollout file locations",
            )
            .measured(format!("{} mismatched rows", archive_mismatches.len()))
            .expected(
                "rows under archived_sessions are archived and rows under sessions are active",
            ),
        );
    }
    if !duplicate_rollout_thread_ids.is_empty() || !duplicate_db_paths.is_empty() {
        check = check.issue(
            DoctorIssue::new(
                CheckStatus::Warning,
                "duplicate thread inventory entries found",
            )
            .measured(format!(
                "{} duplicate rollout thread ids, {} duplicate DB paths",
                duplicate_rollout_thread_ids.len(),
                duplicate_db_paths.len()
            ))
            .expected("one rollout path and thread id per thread")
            .remedy("Attach the doctor report to a bug report so support can inspect samples."),
        );
    }
    if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap {
        check = check.issue(
            DoctorIssue::new(
                CheckStatus::Warning,
                "rollout scan was incomplete or found bad files",
            )
            .measured(format!(
                "{} scan errors, {} malformed names, scan cap reached: {}",
                scan.scan_errors.len(),
                scan.malformed_names.len(),
                scan.reached_scan_cap
            ))
            .expected("rollout directories are fully scannable")
            .remedy("Check file permissions and unexpected files under CODEX_HOME sessions."),
        );
    }
    check
}