in ray-on-gke/tpu/kuberay-tpu-webhook/main.go [336:371]
func checkWorkersMatchTopology(clusterName string, namespace string, workerGroupSpec ray.WorkerGroupSpec) (bool, error) {
klog.V(1).InfoS("checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "workerGroup", workerGroupSpec.GroupName)
numHosts := workerGroupSpec.NumOfHosts // 1 TPU VM host -> 1 Ray worker pod
if numHosts == 0 {
return false, errors.New("workerGroupSpec NumOfHosts not set")
}
groupName := workerGroupSpec.GroupName
containers := workerGroupSpec.Template.Spec.Containers
if len(containers) == 0 {
return false, errors.New("Container path not specified")
}
if containerRequestingTPUs(containers...) {
topology := workerGroupSpec.Template.Spec.NodeSelector["cloud.google.com/gke-tpu-topology"]
klog.V(1).InfoS("checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "topology", topology, "NumOfHosts", numHosts)
if topology == "" {
err := errors.New("TPU topology not specified")
klog.ErrorS(err, "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
return false, err
}
chipsPerHost := getNumTPUChipsRequested(containers...)
if chipsPerHost == 0 {
err := errors.New("Container does not set TPU limits")
klog.ErrorS(err, "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
return false, err
}
expectedHosts, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, chipsPerHost)
if err != nil {
return false, err
}
if expectedHosts != numHosts {
return false, nil
}
}
return true, nil
}