def patch_job()

in src/hyperpod_cli/commands/job.py [0:0]


def patch_job(patch_type: str, job_name: str, namespace: Optional[str]):

    if patch_type not in JobPatchType.get_values():
        logger.error(f"Unsupported patch type: '{patch_type}'")
        exit(1)
    
    if namespace is None:
        resource_attributes_template = V1ResourceAttributes(
            verb="patch",
            group=KUEUE_CUSTOM_OBJECT_GROUP,
            resource=WORKLOAD_CUSTOM_OBJECT_PLURAL,
        )
        # TODO: Unblock this after better customer onboarding experience for Crescendo.
        #namespace = DiscoverNamespaces().discover_accessible_namespace(resource_attributes_template)
        namespace = "default"
    
    patch_type_enum = JobPatchType(patch_type)
    k8s_client = KubernetesClient()

    # Step 1: get the pytorch job definition, UID in metadata is what we need to fetch
    # the corresponding workload managed by kueue
    training_job = k8s_client.get_job(job_name, namespace)
    uid = training_job.get("metadata", defaultdict()).get("uid", None)
    
    if uid is None:
        logger.error("Cannot patch the job because uid cannot be found in metadata.")
        exit(1)
    
    # Step 2: get the workload by apply filtering with uid retrieved in step 1. Only one workload
    # is expected to be returned, otherwise throw error and exit because which workload should be
    # patched is uncertain. But this should not ever happen.
    workload_label_selector = KUEUE_JOB_UID_LABEL_KEY + "=" + uid
    workloads = k8s_client.get_workload_by_label(workload_label_selector, namespace).get("items", [])

    if len(workloads) == 0:
        logger.error(f"No workload found for the job to be patched: '{job_name}'")
        exit(1)
    if len(workloads) > 1:
        logger.error(f"Only exact one workload is expected to be found for job: '{job_name}', but found {len(workloads)}")
        exit(1)
    workload_name = workloads[0].get('metadata', {}).get('name')
    # Step 3: Decide the patch body based on the job patch type specified in command
    patch_body = ""
    if patch_type_enum == JobPatchType.SUSPEND:
        patch_body = {"spec": {"active": False}}
        k8s_client.patch_workload(workload_name, namespace, patch_body)
        logger.info(f"Job {job_name} is suspended.")
    elif patch_type_enum == JobPatchType.UNSUSPEND:
        patch_body = {"spec": {"active": True}}
        k8s_client.patch_workload(workload_name, namespace, patch_body)
        logger.info(f"Job {job_name} is unsuspended.")
    else:
        logger.info("Found unsupported patch type. No operation is performed.")