def _construct_jobset_logical_status()

in metaflow/plugins/kubernetes/kubernetes_jobsets.py [0:0]


def _construct_jobset_logical_status(jobset, control_pod=None):
    if not _basic_validation_for_js(jobset):
        return JobsetStatus(
            control_started=False,
            control_completed=False,
            workers_are_suspended=False,
            workers_have_started=False,
            all_jobs_are_suspended=False,
            jobset_finished=False,
            jobset_failed=False,
            status_unknown=True,
            jobset_was_terminated=False,
            control_exit_code=None,
            control_pod_status=None,
            worker_pods_failed=False,
            control_pod_failed=False,
            some_jobs_are_running=False,
        )

    js_status = jobset.get("status")

    control_started = False
    control_completed = False
    workers_are_suspended = False
    workers_have_started = False
    all_jobs_are_suspended = jobset.get("spec", {}).get("suspend", False)
    jobset_finished = False
    jobset_failed = False
    status_unknown = False
    jobset_was_terminated = False
    worker_pods_failed = False
    some_jobs_are_running = False

    total_worker_jobs = [
        w["replicas"]
        for w in jobset.get("spec").get("replicatedJobs", [])
        if w["name"] == "worker"
    ][0]
    total_control_jobs = [
        w["replicas"]
        for w in jobset.get("spec").get("replicatedJobs", [])
        if w["name"] == "control"
    ][0]

    if total_worker_jobs == 0 and total_control_jobs == 0:
        jobset_was_terminated = True

    replicated_job_statuses = _retrieve_replicated_job_statuses(jobset)
    for job_status in replicated_job_statuses:
        if job_status["active"] > 0:
            some_jobs_are_running = True

        if job_status["name"] == "control":
            control_started = job_status["active"] > 0 or job_status["succeeded"] > 0
            control_completed = job_status["succeeded"] > 0
            if job_status["failed"] > 0:
                jobset_failed = True

        if job_status["name"] == "worker":
            workers_have_started = job_status["active"] == total_worker_jobs
            if "suspended" in job_status:
                # `replicatedJobStatus` didn't have `suspend` field
                #  until v0.3.0. So we need to account for that.
                workers_are_suspended = job_status["suspended"] > 0
            if job_status["failed"] > 0:
                worker_pods_failed = True
                jobset_failed = True

    if js_status.get("conditions"):
        for condition in js_status["conditions"]:
            if condition["type"] == "Completed":
                jobset_finished = True
            if condition["type"] == "Failed":
                jobset_failed = True

    (
        overall_status,
        control_exit_code,
        control_pod_failed,
    ) = _derive_pod_status_and_status_code(control_pod)

    return JobsetStatus(
        control_started=control_started,
        control_completed=control_completed,
        workers_are_suspended=workers_are_suspended,
        workers_have_started=workers_have_started,
        all_jobs_are_suspended=all_jobs_are_suspended,
        jobset_finished=jobset_finished,
        jobset_failed=jobset_failed,
        status_unknown=status_unknown,
        jobset_was_terminated=jobset_was_terminated,
        control_exit_code=control_exit_code,
        control_pod_status=overall_status,
        worker_pods_failed=worker_pods_failed,
        control_pod_failed=control_pod_failed,
        some_jobs_are_running=some_jobs_are_running,
    )