func()

in ray-on-gke/tpu/kuberay-tpu-webhook/main.go [497:563]


func (t *TPUWebhookServer) getSliceToWorkerIDs(clusterName string, groupName string, namespace string, numOfHosts int32) (map[slice][]int, error) {
	sliceToWorkerIDs := make(map[slice][]int)

	// we only care about workers in the same RayCluster and worker group when assigning IDs
	podsInGroup, err := t.podLister.Pods(namespace).List(labels.SelectorFromSet(labels.Set{"ray.io/cluster": clusterName, "ray.io/group": groupName}))
	if err != nil {
		return nil, err
	}

	if podsInGroup == nil {
		// return an empty mapping if no Pods with 'ray.io/group' label found
		return sliceToWorkerIDs, nil
	}
	klog.V(1).InfoS("getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "# Pods in Group", len(podsInGroup))
	for _, existingPod := range podsInGroup {
		if existingPod.DeletionTimestamp != nil {
			continue
		}
		existingNamespace := existingPod.Namespace
		// check that Pods are in the same namespace
		if namespace != existingNamespace {
			continue
		}

		if !containerRequestingTPUs(existingPod.Spec.Containers...) {
			// Pod does not request TPUs, 'ray.io/group' is not a TPU worker group
			return sliceToWorkerIDs, nil
		}
		replicaIndexLabel := existingPod.Labels["replicaIndex"]
		if replicaIndexLabel == "" {
			// Pod has not been intercepted by the KubeRay TPU webhook yet
			continue
		}
		replicaIndexLabelValues := strings.Split(replicaIndexLabel, "-")
		existingReplicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[len(replicaIndexLabelValues)-1])
		existingWorkerID := -1
		for _, container := range existingPod.Spec.Containers {
			if !containerRequestingTPUs(container) {
				continue
			}

			tpuWorkerIDEnvVar := getEnvironmentVariable("TPU_WORKER_ID", container)
			tempVar, err := strconv.Atoi(tpuWorkerIDEnvVar)
			if err != nil {
				klog.ErrorS(err, "getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerIDEnvVar)
				continue
			}
			existingWorkerID = tempVar
			break
		}
		if existingPod.Status.Phase == "Running" && existingWorkerID == -1 {
			return nil, errors.New("existing TPU worker missing TPU_WORKER_ID")
		}
		if existingWorkerID != -1 {
			// Pod has been intercepted by the webhook
			podSlice := slice{clusterName, groupName, namespace, existingReplicaIndex, numOfHosts}
			if sliceToWorkerIDs[podSlice] == nil {
				sliceToWorkerIDs[podSlice] = []int{existingWorkerID}
			} else {
				sliceToWorkerIDs[podSlice] = append(sliceToWorkerIDs[podSlice], existingWorkerID)
			}
			klog.V(1).InfoS("getSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "ReplicaIndex", existingReplicaIndex, "TPU_WORKER_ID", existingWorkerID)

		}
	}
	return sliceToWorkerIDs, nil
}