in scripts/load_bq.py [0:0]
def load_slurm_jobs(start, end):
states = ",".join(
(
"BOOT_FAIL",
"CANCELLED",
"COMPLETED",
"DEADLINE",
"FAILED",
"NODE_FAIL",
"OUT_OF_MEMORY",
"PREEMPTED",
"REQUEUED",
"REVOKED",
"TIMEOUT",
)
)
start_iso = start.isoformat(timespec="seconds")
end_iso = end.isoformat(timespec="seconds")
# slurm_fields and bq_fields will be in matching order
slurm_fields = ",".join(slurm_field_map.values())
bq_fields = slurm_field_map.keys()
cmd = (
f"{SACCT} --start {start_iso} --end {end_iso} -X -D --format={slurm_fields} "
f"--state={states} --parsable2 --noheader --allusers --duplicates"
)
text = run(cmd).stdout.splitlines()
# zip pairs bq_fields with the value from sacct
jobs = [dict(zip(bq_fields, line.split("|"))) for line in text]
# The job index cache allows us to avoid sending duplicate jobs. This avoids a race condition with updating the database.
with shelve.open(str(job_idx_cache_path), flag="r") as job_idx_cache:
job_rows = [
make_job_row(job)
for job in jobs
if str(job["job_db_uuid"]) not in job_idx_cache
]
return job_rows