def cb_run_job()

in community/front-end/ofe/infrastructure_files/gcs_bucket/clusters/ansible_setup/roles/c2_daemon/files/ghpcfe_c2daemon.py [0:0]


def cb_run_job(message, **kwargs):
    """Handler for job submission and monitoring"""
    if not "ackid" in message:
        logger.error(
            "Refusing RUN_JOB message without ackid (message was %s)",
            message,
        )
        return
    ackid = message["ackid"]
    response = {"ackid": ackid}

    if not _verify_params(
        message, ["job_id", "login_uid", "run_script", "num_nodes", "partition"]
    ):
        logger.error("NOT STARTING JOB.  Missing required field(s)")
        response["status"] = "e"
        response["message"] = "Missing Key Info"
        send_message("ACK", response)
        return

    jobid = message["job_id"]
    response["job_id"] = jobid

    logger.info("Starting job %s:%s", jobid, message)

    if int(message["login_uid"]) == 0:
        (username, uid, gid, homedir) = ("root", 0, 0, "/home/root_jobs")
    else:
        try:
            (username, uid, gid, homedir) = _verify_oslogin_user(
                message["login_uid"]
            )
        except KeyError:
            logger.error(
                "User UID %s not OS-Login allowed", message["login_uid"]
            )
            response["status"] = "e"
            response["message"] = (
                f"User {username} (uid={uid}) is not allowed to submit jobs "
                "to this cluster"
            )
            send_message("ACK", response)
            return

    job_dir = Path(homedir) / "jobs" / str(jobid)
    job_dir.mkdir(parents=True, exist_ok=True)
    os.chown(job_dir, uid, gid)

    (slurm_jobid, script_path, outfile, errfile) = _submit_job(
        uid=uid, gid=gid, job_dir=job_dir, **message
    )
    if not slurm_jobid:
        # There was an error - stdout, stderr in outfile, errfile
        logger.error("Failed to run batch submission")
        _upload_log_blobs(
            {
                f"jobs/{jobid}/{script_path.name}": script_path.read_text(),
                f"jobs/{jobid}/stdout": outfile,
                f"jobs/{jobid}/stderr": errfile,
            }
        )
        response["status"] = "e"
        send_message("ACK", response)
        return
    logger.info("Job %s queued as slurm job %s", jobid, slurm_jobid)
    response["status"] = "q"
    response["slurm_job_id"] = slurm_jobid
    send_message("UPDATE", response)

    state = "PENDING"
    while state in ["PENDING", "CONFIGURING"]:
        time.sleep(30)
        state = _slurm_get_job_state(slurm_jobid)

    if state == "RUNNING":
        logger.info("Job %s running as slurm job %s", jobid, slurm_jobid)
        response["status"] = "r"
        send_message("UPDATE", response)

    while state in ["RUNNING"]:
        time.sleep(30)
        state = _slurm_get_job_state(slurm_jobid)

    logger.info(
        "Job %s (slurm %s) completed with result %s", jobid, slurm_jobid, state
    )
    status = "c" if state in ["COMPLETED", "COMPLETING"] else "e"
    response["status"] = "u"
    send_message("UPDATE", response)

    try:
        slurm_job_info = _slurm_get_job_info(slurm_jobid)
        response["job_runtime"] = (
            slurm_job_info["end_time"]["number"] - slurm_job_info["start_time"]["number"]
        )
    except KeyError:
        logger.warning(
            "Job data from SLURM did not include start time and end time"
        )
    except Exception as E:
        logger.error("Unexpected error: %s", E)

    kpi = job_dir / "kpi.json"
    if kpi.is_file():
        with kpi.open("rb") as kpi_fh:
            kpi_info = json.load(kpi_fh)
            response.update(kpi_info)

    logger.info("Uploading log files for %s", jobid)
    try:
        _upload_log_files(
            {
                f"jobs/{jobid}/{script_path.name}": script_path.as_posix(),
                f"jobs/{jobid}/stdout": Path(job_dir / "job.out").as_posix(),
                f"jobs/{jobid}/stderr": Path(job_dir / "job.err").as_posix(),
            }
        )
    except Exception as err:
        logger.error("Failed to upload log files", exc_info=err)

    response["status"] = status
    send_message("ACK", response)

    if kwargs.get("cleanup_choice", "n") in [
        "a",
        "s" if status == "c" else "e",
    ]:
        # Need to empty the job dir before removing
        shutil.rmtree(job_dir)