in torchx/schedulers/kubernetes_scheduler.py [0:0]
def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:
"""
app_to_resource creates a volcano job kubernetes resource definition from
the provided AppDef. The resource definition can be used to launch the
app on Kubernetes.
To support macros we generate one task per replica instead of using the
volcano `replicas` field since macros change the arguments on a per
replica basis.
Volcano has two levels of retries: one at the task level and one at the
job level. When using the APPLICATION retry policy, the job level retry
count is set to the minimum of the max_retries of the roles.
"""
tasks = []
unique_app_id = cleanup_str(make_unique(app.name))
for role_idx, role in enumerate(app.roles):
for replica_id in range(role.num_replicas):
values = macros.Values(
img_root="",
app_id=unique_app_id,
replica_id=str(replica_id),
)
name = cleanup_str(f"{role.name}-{replica_id}")
replica_role = values.apply(role)
pod = role_to_pod(name, replica_role)
pod.metadata.labels.update(pod_labels(app, role_idx, role, replica_id))
task: Dict[str, Any] = {
"replicas": 1,
"name": name,
"template": pod,
}
if role.max_retries > 0:
task["maxRetry"] = role.max_retries
task["policies"] = RETRY_POLICIES[role.retry_policy]
msg = f"""
Role {role.name} configured with restarts: {role.max_retries}. As of 1.4.0 Volcano
does NOT support retries correctly. More info: https://github.com/volcano-sh/volcano/issues/1651
"""
warnings.warn(msg)
tasks.append(task)
job_retries = min(role.max_retries for role in app.roles)
resource: Dict[str, object] = {
"apiVersion": "batch.volcano.sh/v1alpha1",
"kind": "Job",
"metadata": {"name": f"{unique_app_id}"},
"spec": {
"schedulerName": "volcano",
"queue": queue,
"tasks": tasks,
"maxRetry": job_retries,
"plugins": {
"svc": [],
"env": [],
},
},
}
return resource