in tpu-provisioner/internal/cloud/gke.go [307:526]
func (g *GKE) nodePoolForPod(p *corev1.Pod) (*containerv1beta1.NodePool, error) {
ref := metav1.GetControllerOf(p)
if ref == nil {
// TODO: Allow for standalone Pods?
return nil, errors.New("no owner reference")
}
jobSetName := p.Labels[jobset.JobSetNameKey]
if jobSetName == "" {
// This should never be reached due to the event filters in reconciler, but added just in case.
return nil, fmt.Errorf("pod %s is not part of a jobset, not constructing node pool config for it", p.Name)
}
labels := map[string]string{
// Used to keep track of what Node Pools this provisioner is responsible for.
LabelNodepoolManager: LabelNodepoolManagerTPUPodinator,
// Leave some bread crumbs:
LabelParentKind: strings.ToLower(ref.Kind),
LabelParentName: strings.ToLower(ref.Name),
// Assuming a Namespaced parent here...
LabelParentNamespace: strings.ToLower(p.Namespace),
LabelJobSetName: jobSetName,
LabelJobSetNamespace: p.Namespace,
}
// Copy configured labels from the Pod to the Node.
for _, key := range g.ClusterContext.PodToNodeLabels {
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}
// Copy labels specified by annotation to the Node.
for _, key := range strings.Split(getAnnotation(p, AnnotationCopyLabels), ",") {
key = strings.TrimSpace(key)
if key == "" {
continue
}
if val, ok := p.Labels[key]; ok {
labels[key] = val
}
}
for labelKey, labelValue := range p.Spec.NodeSelector {
switch labelKey {
case ICIResiliencyLabel:
labels[labelKey] = labelValue
case LocationHintLabel:
labels[labelKey] = labelValue
default:
// Don't copy GCP/Google labels onto the node.
if !strings.HasPrefix(labelKey, gcpLabelPrefix) && !strings.HasPrefix(labelKey, googleLabelPrefix) {
labels[labelKey] = labelValue
}
}
}
// Pod should already be filtered for this Node Selector at this point.
tpuTopo, ok := p.Spec.NodeSelector[GKETPUNodeSelector]
if !ok {
return nil, fmt.Errorf("missing node selector key: %v", GKETPUNodeSelector)
}
accel, ok := p.Spec.NodeSelector[GKEAcceleratorNodeSelector]
if !ok {
return nil, fmt.Errorf("missing node selector key: %v", GKEAcceleratorNodeSelector)
}
tpuRequest, err := sumTPURequests(p)
if err != nil {
return nil, fmt.Errorf("summing TPU requests: %w", err)
}
nodeCount, err := tpuTopologyToNodeCount(accel, tpuTopo)
if err != nil {
return nil, fmt.Errorf("determining node count: %w", err)
}
machineType, err := tpuMachineType(accel, tpuRequest)
if err != nil {
return nil, fmt.Errorf("determining node count: %w", err)
}
var reservation *containerv1beta1.ReservationAffinity
var taints []*containerv1beta1.NodeTaint
var spot bool
if !g.ClusterContext.ForceOnDemand {
if resName, ok := p.Spec.NodeSelector["cloud.google.com/reservation-name"]; ok {
var resVal string
resProj, ok := p.Spec.NodeSelector["cloud.google.com/reservation-project"]
if ok {
resVal = fmt.Sprintf("projects/%s/reservations/%s", resProj, resName)
} else {
resVal = resName
}
reservation = &containerv1beta1.ReservationAffinity{
ConsumeReservationType: "SPECIFIC_RESERVATION",
Key: "compute.googleapis.com/reservation-name",
Values: []string{
resVal,
},
}
}
spot = p.Spec.NodeSelector["cloud.google.com/gke-spot"] == "true"
if spot {
// Add the taint that NAP would add.
// https://cloud.google.com/kubernetes-engine/docs/concepts/spot-vms#spotvms-nap
taints = append(taints, &containerv1beta1.NodeTaint{
Key: "cloud.google.com/gke-spot",
Value: "true",
Effect: "NO_SCHEDULE",
})
}
}
var secondaryDisks []*containerv1beta1.SecondaryBootDisk
if g.ClusterContext.NodeSecondaryDisk != "" {
secondaryDisks = []*containerv1beta1.SecondaryBootDisk{
{
// Example: "projects/my-gcp-project/global/images/my-disk-image"
DiskImage: g.ClusterContext.NodeSecondaryDisk,
Mode: "CONTAINER_IMAGE_CACHE",
},
}
}
var networkConfig *containerv1beta1.NodeNetworkConfig
var additionalNodeNetworks []*containerv1beta1.AdditionalNodeNetworkConfig
// additional-node-networks: "vpc1:subnet1, vpc2:subnet2"
additionalNodeNetworksCSV := g.ClusterContext.NodeAdditionalNetworks
if getAnnotation(p, AnnotationAdditionalNodeNetworks) != "" {
additionalNodeNetworksCSV = getAnnotation(p, AnnotationAdditionalNodeNetworks)
}
for _, pair := range strings.Split(additionalNodeNetworksCSV, ",") {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
netAndSubnet := strings.SplitN(pair, ":", 2)
if len(netAndSubnet) != 2 {
return nil, fmt.Errorf("invalid additional network annotation: %v", pair)
}
additionalNodeNetworks = append(additionalNodeNetworks, &containerv1beta1.AdditionalNodeNetworkConfig{
Network: strings.TrimSpace(netAndSubnet[0]),
Subnetwork: strings.TrimSpace(netAndSubnet[1]),
})
}
if len(additionalNodeNetworks) > 0 {
networkConfig = &containerv1beta1.NodeNetworkConfig{
AdditionalNodeNetworkConfigs: additionalNodeNetworks,
}
}
nodeServiceAccount := g.ClusterContext.NodeServiceAccount
if sa, ok := p.Annotations[AnnotationNodeServiceAccount]; ok {
nodeServiceAccount = sa
}
// placement policy is only valid in GKE for non "1t" shapes
placementPolicy := &containerv1beta1.PlacementPolicy{}
if !strings.HasSuffix(machineType, "1t") {
placementPolicy.TpuTopology = tpuTopo
placementPolicy.Type = "COMPACT"
}
var diskType string
if g.ClusterContext.NodeDiskType != "" {
diskType = g.ClusterContext.NodeDiskType
}
name, err := podToNodePoolName(p)
if err != nil {
return nil, err
}
np := &containerv1beta1.NodePool{
Name: name,
Config: &containerv1beta1.NodeConfig{
ServiceAccount: nodeServiceAccount,
ShieldedInstanceConfig: &containerv1beta1.ShieldedInstanceConfig{
EnableIntegrityMonitoring: true,
EnableSecureBoot: g.ClusterContext.NodeSecureBoot,
},
Tags: g.ClusterContext.NodeTags,
// NOTE: vendor/ was manually updated to include the field because
// it was not currently available at the time of writing:
SecondaryBootDisks: secondaryDisks,
MachineType: machineType,
ReservationAffinity: reservation,
Labels: labels,
Spot: spot,
Taints: taints,
BootDiskKmsKey: g.ClusterContext.NodeBootDiskKMSKey,
DiskType: diskType,
EnableConfidentialStorage: g.ClusterContext.NodeConfidentialStorage,
},
InitialNodeCount: int64(nodeCount),
Locations: []string{g.ClusterContext.NodeZone},
PlacementPolicy: placementPolicy,
Management: &containerv1beta1.NodeManagement{
AutoRepair: true,
AutoUpgrade: false,
},
UpgradeSettings: &containerv1beta1.UpgradeSettings{
MaxSurge: 1,
},
MaxPodsConstraint: &containerv1beta1.MaxPodsConstraint{MaxPodsPerNode: maxPodsPerNode},
NetworkConfig: networkConfig,
}
hash, err := nodePoolSelectiveHash(np)
if err != nil {
return nil, fmt.Errorf("hashing node pool: %w", err)
}
np.Config.Labels[LabelNodePoolHash] = hash
return np, nil
}