in src/hyperpod_cli/validators/job_validator.py [0:0]
def validate_yaml_content(data):
cluster_fields = data.get("cluster")
if cluster_fields is None:
logger.error("Please ensure 'cluster' field provided in the config file")
return False
cluster_type = cluster_fields.get("cluster_type")
if cluster_type is None or cluster_type != "k8s":
logger.error("Only support 'k8s' cluster type currently.")
return False
cluster_config_fields = cluster_fields.get("cluster_config")
if cluster_config_fields is None:
logger.error(
"Please ensure 'cluster' contains 'cluster_config' field in the config file"
)
return False
custom_labels = cluster_config_fields.get("custom_labels")
annotations = cluster_config_fields.get("annotations")
namespace = cluster_config_fields.get("namespace")
scheduler_type = cluster_config_fields.get("scheduler_type", SchedulerType.get_default().value)
if scheduler_type not in SchedulerType.get_values():
logger.error(
f"Unsupported scheduler type '{scheduler_type}', only {SchedulerType.get_values()} are allowed."
)
return False
instance_type = cluster_fields.get("instance_type", None)
queue_name = None
if custom_labels is not None:
queue_name = custom_labels.get(KUEUE_QUEUE_NAME_LABEL_KEY, None)
label_selector = cluster_config_fields.setdefault("label_selector",{})
required_labels = label_selector.get("required", {})
preferred_labels = label_selector.get("preferred", {})
if (
not required_labels.get(INSTANCE_TYPE_LABEL) and
not preferred_labels.get(INSTANCE_TYPE_LABEL)
):
if "required" not in label_selector:
label_selector["required"] = {}
label_selector["required"][INSTANCE_TYPE_LABEL] = (
[str(instance_type)]
)
auto_resume = False
max_retry = None
if annotations is not None:
auto_resume = annotations.get(HYPERPOD_AUTO_RESUME_ANNOTATION_KEY, False)
max_retry = annotations.get(HYPERPOD_MAX_RETRY_ANNOTATION_KEY, None)
if auto_resume and (annotations.get(HYPERPOD_MAX_RETRY_ANNOTATION_KEY) is None):
logger.error(
f"Please provide both '{HYPERPOD_AUTO_RESUME_ANNOTATION_KEY}' "
f"and '{HYPERPOD_MAX_RETRY_ANNOTATION_KEY}' "
f"annotations to use Auto Resume feature"
)
return False
priority = cluster_config_fields.get("priority_class_name", None)
restart_policy = cluster_config_fields.get("restartPolicy", None)
workload_priority = None
if custom_labels is not None:
workload_priority = custom_labels.get(KUEUE_WORKLOAD_PRIORITY_CLASS_LABEL_KEY, None)
if not validate_scheduler_related_fields(scheduler_type, namespace, workload_priority):
return False
if not validate_hyperpod_related_fields(
instance_type,
queue_name,
priority,
auto_resume,
restart_policy,
max_retry,
namespace,
):
return False
return True