in community/front-end/ofe/infrastructure_files/gcs_bucket/clusters/ansible_setup/roles/c2_daemon/files/ghpcfe_c2daemon.py [0:0]
def cb_run_job(message, **kwargs):
"""Handler for job submission and monitoring"""
if not "ackid" in message:
logger.error(
"Refusing RUN_JOB message without ackid (message was %s)",
message,
)
return
ackid = message["ackid"]
response = {"ackid": ackid}
if not _verify_params(
message, ["job_id", "login_uid", "run_script", "num_nodes", "partition"]
):
logger.error("NOT STARTING JOB. Missing required field(s)")
response["status"] = "e"
response["message"] = "Missing Key Info"
send_message("ACK", response)
return
jobid = message["job_id"]
response["job_id"] = jobid
logger.info("Starting job %s:%s", jobid, message)
if int(message["login_uid"]) == 0:
(username, uid, gid, homedir) = ("root", 0, 0, "/home/root_jobs")
else:
try:
(username, uid, gid, homedir) = _verify_oslogin_user(
message["login_uid"]
)
except KeyError:
logger.error(
"User UID %s not OS-Login allowed", message["login_uid"]
)
response["status"] = "e"
response["message"] = (
f"User {username} (uid={uid}) is not allowed to submit jobs "
"to this cluster"
)
send_message("ACK", response)
return
job_dir = Path(homedir) / "jobs" / str(jobid)
job_dir.mkdir(parents=True, exist_ok=True)
os.chown(job_dir, uid, gid)
(slurm_jobid, script_path, outfile, errfile) = _submit_job(
uid=uid, gid=gid, job_dir=job_dir, **message
)
if not slurm_jobid:
# There was an error - stdout, stderr in outfile, errfile
logger.error("Failed to run batch submission")
_upload_log_blobs(
{
f"jobs/{jobid}/{script_path.name}": script_path.read_text(),
f"jobs/{jobid}/stdout": outfile,
f"jobs/{jobid}/stderr": errfile,
}
)
response["status"] = "e"
send_message("ACK", response)
return
logger.info("Job %s queued as slurm job %s", jobid, slurm_jobid)
response["status"] = "q"
response["slurm_job_id"] = slurm_jobid
send_message("UPDATE", response)
state = "PENDING"
while state in ["PENDING", "CONFIGURING"]:
time.sleep(30)
state = _slurm_get_job_state(slurm_jobid)
if state == "RUNNING":
logger.info("Job %s running as slurm job %s", jobid, slurm_jobid)
response["status"] = "r"
send_message("UPDATE", response)
while state in ["RUNNING"]:
time.sleep(30)
state = _slurm_get_job_state(slurm_jobid)
logger.info(
"Job %s (slurm %s) completed with result %s", jobid, slurm_jobid, state
)
status = "c" if state in ["COMPLETED", "COMPLETING"] else "e"
response["status"] = "u"
send_message("UPDATE", response)
try:
slurm_job_info = _slurm_get_job_info(slurm_jobid)
response["job_runtime"] = (
slurm_job_info["end_time"]["number"] - slurm_job_info["start_time"]["number"]
)
except KeyError:
logger.warning(
"Job data from SLURM did not include start time and end time"
)
except Exception as E:
logger.error("Unexpected error: %s", E)
kpi = job_dir / "kpi.json"
if kpi.is_file():
with kpi.open("rb") as kpi_fh:
kpi_info = json.load(kpi_fh)
response.update(kpi_info)
logger.info("Uploading log files for %s", jobid)
try:
_upload_log_files(
{
f"jobs/{jobid}/{script_path.name}": script_path.as_posix(),
f"jobs/{jobid}/stdout": Path(job_dir / "job.out").as_posix(),
f"jobs/{jobid}/stderr": Path(job_dir / "job.err").as_posix(),
}
)
except Exception as err:
logger.error("Failed to upload log files", exc_info=err)
response["status"] = status
send_message("ACK", response)
if kwargs.get("cleanup_choice", "n") in [
"a",
"s" if status == "c" else "e",
]:
# Need to empty the job dir before removing
shutil.rmtree(job_dir)