def validate_yaml_content()

in src/hyperpod_cli/validators/job_validator.py [0:0]


def validate_yaml_content(data):
    cluster_fields = data.get("cluster")
    if cluster_fields is None:
        logger.error("Please ensure 'cluster' field provided in the config file")
        return False
    cluster_type = cluster_fields.get("cluster_type")
    if cluster_type is None or cluster_type != "k8s":
        logger.error("Only support 'k8s' cluster type currently.")
        return False
    cluster_config_fields = cluster_fields.get("cluster_config")
    if cluster_config_fields is None:
        logger.error(
            "Please ensure 'cluster' contains 'cluster_config' field in the config file"
        )
        return False

    custom_labels = cluster_config_fields.get("custom_labels")
    annotations = cluster_config_fields.get("annotations")
    namespace = cluster_config_fields.get("namespace")
    scheduler_type = cluster_config_fields.get("scheduler_type", SchedulerType.get_default().value)

    if scheduler_type not in SchedulerType.get_values():
        logger.error(
            f"Unsupported scheduler type '{scheduler_type}', only {SchedulerType.get_values()} are allowed."
        )
        return False
    instance_type = cluster_fields.get("instance_type", None)
    queue_name = None
    if custom_labels is not None:
        queue_name = custom_labels.get(KUEUE_QUEUE_NAME_LABEL_KEY, None)

    label_selector = cluster_config_fields.setdefault("label_selector",{})
    required_labels = label_selector.get("required", {})
    preferred_labels = label_selector.get("preferred", {})

    if (
        not required_labels.get(INSTANCE_TYPE_LABEL) and
        not preferred_labels.get(INSTANCE_TYPE_LABEL)
    ):
        if "required" not in label_selector:
            label_selector["required"] = {}
        label_selector["required"][INSTANCE_TYPE_LABEL] = (
            [str(instance_type)]
        )

    auto_resume = False
    max_retry = None
    if annotations is not None:
        auto_resume = annotations.get(HYPERPOD_AUTO_RESUME_ANNOTATION_KEY, False)
        max_retry = annotations.get(HYPERPOD_MAX_RETRY_ANNOTATION_KEY, None)
        if auto_resume and (annotations.get(HYPERPOD_MAX_RETRY_ANNOTATION_KEY) is None):
            logger.error(
                f"Please provide both '{HYPERPOD_AUTO_RESUME_ANNOTATION_KEY}' "
                f"and '{HYPERPOD_MAX_RETRY_ANNOTATION_KEY}' "
                f"annotations to use Auto Resume feature"
            )
            return False

    priority = cluster_config_fields.get("priority_class_name", None)
    restart_policy = cluster_config_fields.get("restartPolicy", None)
    workload_priority = None

    if custom_labels is not None:
        workload_priority = custom_labels.get(KUEUE_WORKLOAD_PRIORITY_CLASS_LABEL_KEY, None)

    if not validate_scheduler_related_fields(scheduler_type, namespace, workload_priority):
        return False

    if not validate_hyperpod_related_fields(
        instance_type,
        queue_name,
        priority,
        auto_resume,
        restart_policy,
        max_retry,
        namespace,
    ):
        return False

    return True