in .codex/skills/babysit-pr/scripts/gh_pr_watch.py [0:0]
def failed_jobs_from_workflow_runs(repo, runs, head_sha):
failed_jobs = []
for run in runs:
if not isinstance(run, dict):
continue
if str(run.get("head_sha") or "") != head_sha:
continue
run_id = run.get("id")
if run_id in (None, ""):
continue
run_status = str(run.get("status") or "")
run_conclusion = str(run.get("conclusion") or "")
if run_status.lower() == "completed" and run_conclusion not in FAILED_RUN_CONCLUSIONS:
continue
jobs = get_jobs_for_run(repo, run_id)
for job in jobs:
if not isinstance(job, dict):
continue
conclusion = str(job.get("conclusion") or "")
if conclusion not in FAILED_RUN_CONCLUSIONS:
continue
job_id = job.get("id")
logs_endpoint = None
if job_id not in (None, ""):
logs_endpoint = f"repos/{repo}/actions/jobs/{job_id}/logs"
failed_jobs.append(
{
"run_id": run_id,
"workflow_name": run.get("name") or run.get("display_title") or "",
"run_status": run_status,
"run_conclusion": run_conclusion,
"job_id": job_id,
"job_name": str(job.get("name") or ""),
"status": str(job.get("status") or ""),
"conclusion": conclusion,
"html_url": str(job.get("html_url") or ""),
"logs_endpoint": logs_endpoint,
}
)
failed_jobs.sort(
key=lambda item: (
str(item.get("workflow_name") or ""),
str(item.get("job_name") or ""),
str(item.get("job_id") or ""),
)
)
return failed_jobs