in metaflow/plugins/kubernetes/kubernetes_jobsets.py [0:0]
def _construct_jobset_logical_status(jobset, control_pod=None):
if not _basic_validation_for_js(jobset):
return JobsetStatus(
control_started=False,
control_completed=False,
workers_are_suspended=False,
workers_have_started=False,
all_jobs_are_suspended=False,
jobset_finished=False,
jobset_failed=False,
status_unknown=True,
jobset_was_terminated=False,
control_exit_code=None,
control_pod_status=None,
worker_pods_failed=False,
control_pod_failed=False,
some_jobs_are_running=False,
)
js_status = jobset.get("status")
control_started = False
control_completed = False
workers_are_suspended = False
workers_have_started = False
all_jobs_are_suspended = jobset.get("spec", {}).get("suspend", False)
jobset_finished = False
jobset_failed = False
status_unknown = False
jobset_was_terminated = False
worker_pods_failed = False
some_jobs_are_running = False
total_worker_jobs = [
w["replicas"]
for w in jobset.get("spec").get("replicatedJobs", [])
if w["name"] == "worker"
][0]
total_control_jobs = [
w["replicas"]
for w in jobset.get("spec").get("replicatedJobs", [])
if w["name"] == "control"
][0]
if total_worker_jobs == 0 and total_control_jobs == 0:
jobset_was_terminated = True
replicated_job_statuses = _retrieve_replicated_job_statuses(jobset)
for job_status in replicated_job_statuses:
if job_status["active"] > 0:
some_jobs_are_running = True
if job_status["name"] == "control":
control_started = job_status["active"] > 0 or job_status["succeeded"] > 0
control_completed = job_status["succeeded"] > 0
if job_status["failed"] > 0:
jobset_failed = True
if job_status["name"] == "worker":
workers_have_started = job_status["active"] == total_worker_jobs
if "suspended" in job_status:
# `replicatedJobStatus` didn't have `suspend` field
# until v0.3.0. So we need to account for that.
workers_are_suspended = job_status["suspended"] > 0
if job_status["failed"] > 0:
worker_pods_failed = True
jobset_failed = True
if js_status.get("conditions"):
for condition in js_status["conditions"]:
if condition["type"] == "Completed":
jobset_finished = True
if condition["type"] == "Failed":
jobset_failed = True
(
overall_status,
control_exit_code,
control_pod_failed,
) = _derive_pod_status_and_status_code(control_pod)
return JobsetStatus(
control_started=control_started,
control_completed=control_completed,
workers_are_suspended=workers_are_suspended,
workers_have_started=workers_have_started,
all_jobs_are_suspended=all_jobs_are_suspended,
jobset_finished=jobset_finished,
jobset_failed=jobset_failed,
status_unknown=status_unknown,
jobset_was_terminated=jobset_was_terminated,
control_exit_code=control_exit_code,
control_pod_status=overall_status,
worker_pods_failed=worker_pods_failed,
control_pod_failed=control_pod_failed,
some_jobs_are_running=some_jobs_are_running,
)