in ray-on-gke/tpu/kuberay-tpu-webhook/main.go [497:563]
func (t *TPUWebhookServer) getSliceToWorkerIDs(clusterName string, groupName string, namespace string, numOfHosts int32) (map[slice][]int, error) {
sliceToWorkerIDs := make(map[slice][]int)
// we only care about workers in the same RayCluster and worker group when assigning IDs
podsInGroup, err := t.podLister.Pods(namespace).List(labels.SelectorFromSet(labels.Set{"ray.io/cluster": clusterName, "ray.io/group": groupName}))
if err != nil {
return nil, err
}
if podsInGroup == nil {
// return an empty mapping if no Pods with 'ray.io/group' label found
return sliceToWorkerIDs, nil
}
klog.V(1).InfoS("getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "# Pods in Group", len(podsInGroup))
for _, existingPod := range podsInGroup {
if existingPod.DeletionTimestamp != nil {
continue
}
existingNamespace := existingPod.Namespace
// check that Pods are in the same namespace
if namespace != existingNamespace {
continue
}
if !containerRequestingTPUs(existingPod.Spec.Containers...) {
// Pod does not request TPUs, 'ray.io/group' is not a TPU worker group
return sliceToWorkerIDs, nil
}
replicaIndexLabel := existingPod.Labels["replicaIndex"]
if replicaIndexLabel == "" {
// Pod has not been intercepted by the KubeRay TPU webhook yet
continue
}
replicaIndexLabelValues := strings.Split(replicaIndexLabel, "-")
existingReplicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[len(replicaIndexLabelValues)-1])
existingWorkerID := -1
for _, container := range existingPod.Spec.Containers {
if !containerRequestingTPUs(container) {
continue
}
tpuWorkerIDEnvVar := getEnvironmentVariable("TPU_WORKER_ID", container)
tempVar, err := strconv.Atoi(tpuWorkerIDEnvVar)
if err != nil {
klog.ErrorS(err, "getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerIDEnvVar)
continue
}
existingWorkerID = tempVar
break
}
if existingPod.Status.Phase == "Running" && existingWorkerID == -1 {
return nil, errors.New("existing TPU worker missing TPU_WORKER_ID")
}
if existingWorkerID != -1 {
// Pod has been intercepted by the webhook
podSlice := slice{clusterName, groupName, namespace, existingReplicaIndex, numOfHosts}
if sliceToWorkerIDs[podSlice] == nil {
sliceToWorkerIDs[podSlice] = []int{existingWorkerID}
} else {
sliceToWorkerIDs[podSlice] = append(sliceToWorkerIDs[podSlice], existingWorkerID)
}
klog.V(1).InfoS("getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "ReplicaIndex", existingReplicaIndex, "TPU_WORKER_ID", existingWorkerID)
}
}
return sliceToWorkerIDs, nil
}