in src/hyperpod_cli/commands/job.py [0:0]
def patch_job(patch_type: str, job_name: str, namespace: Optional[str]):
if patch_type not in JobPatchType.get_values():
logger.error(f"Unsupported patch type: '{patch_type}'")
exit(1)
if namespace is None:
resource_attributes_template = V1ResourceAttributes(
verb="patch",
group=KUEUE_CUSTOM_OBJECT_GROUP,
resource=WORKLOAD_CUSTOM_OBJECT_PLURAL,
)
# TODO: Unblock this after better customer onboarding experience for Crescendo.
#namespace = DiscoverNamespaces().discover_accessible_namespace(resource_attributes_template)
namespace = "default"
patch_type_enum = JobPatchType(patch_type)
k8s_client = KubernetesClient()
# Step 1: get the pytorch job definition, UID in metadata is what we need to fetch
# the corresponding workload managed by kueue
training_job = k8s_client.get_job(job_name, namespace)
uid = training_job.get("metadata", defaultdict()).get("uid", None)
if uid is None:
logger.error("Cannot patch the job because uid cannot be found in metadata.")
exit(1)
# Step 2: get the workload by apply filtering with uid retrieved in step 1. Only one workload
# is expected to be returned, otherwise throw error and exit because which workload should be
# patched is uncertain. But this should not ever happen.
workload_label_selector = KUEUE_JOB_UID_LABEL_KEY + "=" + uid
workloads = k8s_client.get_workload_by_label(workload_label_selector, namespace).get("items", [])
if len(workloads) == 0:
logger.error(f"No workload found for the job to be patched: '{job_name}'")
exit(1)
if len(workloads) > 1:
logger.error(f"Only exact one workload is expected to be found for job: '{job_name}', but found {len(workloads)}")
exit(1)
workload_name = workloads[0].get('metadata', {}).get('name')
# Step 3: Decide the patch body based on the job patch type specified in command
patch_body = ""
if patch_type_enum == JobPatchType.SUSPEND:
patch_body = {"spec": {"active": False}}
k8s_client.patch_workload(workload_name, namespace, patch_body)
logger.info(f"Job {job_name} is suspended.")
elif patch_type_enum == JobPatchType.UNSUSPEND:
patch_body = {"spec": {"active": True}}
k8s_client.patch_workload(workload_name, namespace, patch_body)
logger.info(f"Job {job_name} is unsuspended.")
else:
logger.info("Found unsupported patch type. No operation is performed.")