in codex-rs/cli/src/doctor/thread_inventory.rs [212:421]
fn parity_check_from_scan_and_rows(
codex_home: &Path,
scan: RolloutScan,
rows: Vec<ThreadStateAuditRow>,
mut details: Vec<String>,
) -> DoctorCheck {
let rollout_by_key = scan
.files
.iter()
.map(|file| (file.key.clone(), file))
.collect::<HashMap<_, _>>();
let mut rows_by_key: HashMap<PathBuf, Vec<&ThreadStateAuditRow>> = HashMap::new();
for row in &rows {
rows_by_key
.entry(path_key(&row.rollout_path))
.or_default()
.push(row);
}
let missing_active = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ false);
let missing_archived = missing_rollout_paths(&scan.files, &rows_by_key, /*archived*/ true);
let scan_complete = !scan.reached_scan_cap;
let stale_rows = if scan_complete {
rows.iter()
.filter(|row| !row.rollout_path.is_file())
.collect::<Vec<_>>()
} else {
Vec::new()
};
let archive_mismatches = if scan_complete {
rows.iter()
.filter_map(|row| {
let expected_archived = rollout_by_key
.get(&path_key(&row.rollout_path))
.map(|file| file.archived)
.or_else(|| {
row.rollout_path
.is_file()
.then(|| archived_from_rollout_path(codex_home, &row.rollout_path))
.flatten()
})?;
(expected_archived != row.archived).then_some(row)
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
let duplicate_rollout_thread_ids = duplicate_rollout_thread_ids(&scan.files);
let duplicate_db_paths = duplicate_db_paths(&rows_by_key);
let archived_rows = rows.iter().filter(|row| row.archived).count();
let active_rows = rows.len() - archived_rows;
details.extend([
format!("rollout DB rows: {}", rows.len()),
format!("rollout DB active rows: {active_rows}"),
format!("rollout DB archived rows: {archived_rows}"),
format!("rollout DB missing active rows: {}", missing_active.len()),
format!(
"rollout DB missing archived rows: {}",
missing_archived.len()
),
format!(
"rollout DB stale rows: {}",
count_or_skipped(stale_rows.len(), scan_complete)
),
format!(
"rollout DB archive mismatches: {}",
count_or_skipped(archive_mismatches.len(), scan_complete)
),
format!(
"rollout DB duplicate rollout thread ids: {}",
duplicate_rollout_thread_ids.len()
),
format!(
"rollout DB duplicate DB paths: {}",
duplicate_db_paths.len()
),
format!(
"rollout DB model providers: {}",
count_summary(rows.iter().map(|row| row.model_provider.as_str()))
),
format!(
"rollout DB sources: {}",
count_summary(rows.iter().map(|row| source_category(&row.source)))
),
]);
push_path_samples(
&mut details,
"rollout DB missing active sample",
missing_active.iter().copied(),
);
push_path_samples(
&mut details,
"rollout DB missing archived sample",
missing_archived.iter().copied(),
);
push_path_samples(
&mut details,
"rollout DB stale row sample",
stale_rows.iter().map(|row| row.rollout_path.as_path()),
);
push_path_samples(
&mut details,
"rollout DB archive mismatch sample",
archive_mismatches
.iter()
.map(|row| row.rollout_path.as_path()),
);
push_samples(
&mut details,
"rollout DB duplicate rollout thread id sample",
duplicate_rollout_thread_ids.iter().map(String::as_str),
);
push_path_samples(
&mut details,
"rollout DB duplicate DB path sample",
duplicate_db_paths.iter().map(PathBuf::as_path),
);
let status = if scan.scan_errors.is_empty()
&& scan.malformed_names.is_empty()
&& !scan.reached_scan_cap
&& missing_active.is_empty()
&& missing_archived.is_empty()
&& stale_rows.is_empty()
&& archive_mismatches.is_empty()
&& duplicate_rollout_thread_ids.is_empty()
&& duplicate_db_paths.is_empty()
{
CheckStatus::Ok
} else {
CheckStatus::Warning
};
let summary = if status == CheckStatus::Ok {
"rollout files and state DB thread inventory agree"
} else {
"rollout files and state DB thread inventory differ"
};
let mut check = DoctorCheck::new(CHECK_ID, CHECK_CATEGORY, status, summary).details(details);
if !missing_active.is_empty() || !missing_archived.is_empty() {
check = check.issue(
DoctorIssue::new(
CheckStatus::Warning,
"rollout files are missing from the state DB",
)
.measured(format!(
"{} active, {} archived",
missing_active.len(),
missing_archived.len()
))
.expected("every rollout file has a matching threads row"),
);
}
if !stale_rows.is_empty() {
check = check.issue(
DoctorIssue::new(
CheckStatus::Warning,
"state DB rows point at missing or unusable rollout files",
)
.measured(format!("{} stale rows", stale_rows.len()))
.expected("every state DB rollout path is a file on disk"),
);
}
if !archive_mismatches.is_empty() {
check = check.issue(
DoctorIssue::new(
CheckStatus::Warning,
"state DB archive flags disagree with rollout file locations",
)
.measured(format!("{} mismatched rows", archive_mismatches.len()))
.expected(
"rows under archived_sessions are archived and rows under sessions are active",
),
);
}
if !duplicate_rollout_thread_ids.is_empty() || !duplicate_db_paths.is_empty() {
check = check.issue(
DoctorIssue::new(
CheckStatus::Warning,
"duplicate thread inventory entries found",
)
.measured(format!(
"{} duplicate rollout thread ids, {} duplicate DB paths",
duplicate_rollout_thread_ids.len(),
duplicate_db_paths.len()
))
.expected("one rollout path and thread id per thread")
.remedy("Attach the doctor report to a bug report so support can inspect samples."),
);
}
if !scan.scan_errors.is_empty() || !scan.malformed_names.is_empty() || scan.reached_scan_cap {
check = check.issue(
DoctorIssue::new(
CheckStatus::Warning,
"rollout scan was incomplete or found bad files",
)
.measured(format!(
"{} scan errors, {} malformed names, scan cap reached: {}",
scan.scan_errors.len(),
scan.malformed_names.len(),
scan.reached_scan_cap
))
.expected("rollout directories are fully scannable")
.remedy("Check file permissions and unexpected files under CODEX_HOME sessions."),
);
}
check
}