func()

in tpu-provisioner/internal/cloud/gke.go [307:526]


func (g *GKE) nodePoolForPod(p *corev1.Pod) (*containerv1beta1.NodePool, error) {
	ref := metav1.GetControllerOf(p)
	if ref == nil {
		// TODO: Allow for standalone Pods?
		return nil, errors.New("no owner reference")
	}

	jobSetName := p.Labels[jobset.JobSetNameKey]
	if jobSetName == "" {
		// This should never be reached due to the event filters in reconciler, but added just in case.
		return nil, fmt.Errorf("pod %s is not part of a jobset, not constructing node pool config for it", p.Name)
	}

	labels := map[string]string{
		// Used to keep track of what Node Pools this provisioner is responsible for.
		LabelNodepoolManager: LabelNodepoolManagerTPUPodinator,

		// Leave some bread crumbs:
		LabelParentKind: strings.ToLower(ref.Kind),
		LabelParentName: strings.ToLower(ref.Name),
		// Assuming a Namespaced parent here...
		LabelParentNamespace: strings.ToLower(p.Namespace),

		LabelJobSetName:      jobSetName,
		LabelJobSetNamespace: p.Namespace,
	}

	// Copy configured labels from the Pod to the Node.
	for _, key := range g.ClusterContext.PodToNodeLabels {
		if val, ok := p.Labels[key]; ok {
			labels[key] = val
		}
	}

	// Copy labels specified by annotation to the Node.
	for _, key := range strings.Split(getAnnotation(p, AnnotationCopyLabels), ",") {
		key = strings.TrimSpace(key)
		if key == "" {
			continue
		}
		if val, ok := p.Labels[key]; ok {
			labels[key] = val
		}
	}

	for labelKey, labelValue := range p.Spec.NodeSelector {
		switch labelKey {
		case ICIResiliencyLabel:
			labels[labelKey] = labelValue
		case LocationHintLabel:
			labels[labelKey] = labelValue
		default:
			// Don't copy GCP/Google labels onto the node.
			if !strings.HasPrefix(labelKey, gcpLabelPrefix) && !strings.HasPrefix(labelKey, googleLabelPrefix) {
				labels[labelKey] = labelValue
			}
		}
	}

	// Pod should already be filtered for this Node Selector at this point.
	tpuTopo, ok := p.Spec.NodeSelector[GKETPUNodeSelector]
	if !ok {
		return nil, fmt.Errorf("missing node selector key: %v", GKETPUNodeSelector)
	}
	accel, ok := p.Spec.NodeSelector[GKEAcceleratorNodeSelector]
	if !ok {
		return nil, fmt.Errorf("missing node selector key: %v", GKEAcceleratorNodeSelector)
	}
	tpuRequest, err := sumTPURequests(p)
	if err != nil {
		return nil, fmt.Errorf("summing TPU requests: %w", err)
	}

	nodeCount, err := tpuTopologyToNodeCount(accel, tpuTopo)
	if err != nil {
		return nil, fmt.Errorf("determining node count: %w", err)
	}
	machineType, err := tpuMachineType(accel, tpuRequest)
	if err != nil {
		return nil, fmt.Errorf("determining node count: %w", err)
	}

	var reservation *containerv1beta1.ReservationAffinity
	var taints []*containerv1beta1.NodeTaint
	var spot bool

	if !g.ClusterContext.ForceOnDemand {
		if resName, ok := p.Spec.NodeSelector["cloud.google.com/reservation-name"]; ok {
			var resVal string
			resProj, ok := p.Spec.NodeSelector["cloud.google.com/reservation-project"]
			if ok {
				resVal = fmt.Sprintf("projects/%s/reservations/%s", resProj, resName)
			} else {
				resVal = resName
			}
			reservation = &containerv1beta1.ReservationAffinity{
				ConsumeReservationType: "SPECIFIC_RESERVATION",
				Key:                    "compute.googleapis.com/reservation-name",
				Values: []string{
					resVal,
				},
			}
		}

		spot = p.Spec.NodeSelector["cloud.google.com/gke-spot"] == "true"
		if spot {
			// Add the taint that NAP would add.
			// https://cloud.google.com/kubernetes-engine/docs/concepts/spot-vms#spotvms-nap
			taints = append(taints, &containerv1beta1.NodeTaint{
				Key:    "cloud.google.com/gke-spot",
				Value:  "true",
				Effect: "NO_SCHEDULE",
			})
		}
	}

	var secondaryDisks []*containerv1beta1.SecondaryBootDisk
	if g.ClusterContext.NodeSecondaryDisk != "" {
		secondaryDisks = []*containerv1beta1.SecondaryBootDisk{
			{
				// Example: "projects/my-gcp-project/global/images/my-disk-image"
				DiskImage: g.ClusterContext.NodeSecondaryDisk,
				Mode:      "CONTAINER_IMAGE_CACHE",
			},
		}
	}

	var networkConfig *containerv1beta1.NodeNetworkConfig
	var additionalNodeNetworks []*containerv1beta1.AdditionalNodeNetworkConfig
	// additional-node-networks: "vpc1:subnet1, vpc2:subnet2"
	additionalNodeNetworksCSV := g.ClusterContext.NodeAdditionalNetworks
	if getAnnotation(p, AnnotationAdditionalNodeNetworks) != "" {
		additionalNodeNetworksCSV = getAnnotation(p, AnnotationAdditionalNodeNetworks)
	}
	for _, pair := range strings.Split(additionalNodeNetworksCSV, ",") {
		pair = strings.TrimSpace(pair)
		if pair == "" {
			continue
		}

		netAndSubnet := strings.SplitN(pair, ":", 2)
		if len(netAndSubnet) != 2 {
			return nil, fmt.Errorf("invalid additional network annotation: %v", pair)
		}

		additionalNodeNetworks = append(additionalNodeNetworks, &containerv1beta1.AdditionalNodeNetworkConfig{
			Network:    strings.TrimSpace(netAndSubnet[0]),
			Subnetwork: strings.TrimSpace(netAndSubnet[1]),
		})
	}
	if len(additionalNodeNetworks) > 0 {
		networkConfig = &containerv1beta1.NodeNetworkConfig{
			AdditionalNodeNetworkConfigs: additionalNodeNetworks,
		}
	}

	nodeServiceAccount := g.ClusterContext.NodeServiceAccount
	if sa, ok := p.Annotations[AnnotationNodeServiceAccount]; ok {
		nodeServiceAccount = sa
	}

	// placement policy is only valid in GKE for non "1t" shapes
	placementPolicy := &containerv1beta1.PlacementPolicy{}
	if !strings.HasSuffix(machineType, "1t") {
		placementPolicy.TpuTopology = tpuTopo
		placementPolicy.Type = "COMPACT"
	}

	var diskType string
	if g.ClusterContext.NodeDiskType != "" {
		diskType = g.ClusterContext.NodeDiskType
	}

	name, err := podToNodePoolName(p)
	if err != nil {
		return nil, err
	}

	np := &containerv1beta1.NodePool{
		Name: name,
		Config: &containerv1beta1.NodeConfig{
			ServiceAccount: nodeServiceAccount,
			ShieldedInstanceConfig: &containerv1beta1.ShieldedInstanceConfig{
				EnableIntegrityMonitoring: true,
				EnableSecureBoot:          g.ClusterContext.NodeSecureBoot,
			},
			Tags: g.ClusterContext.NodeTags,
			// NOTE: vendor/ was manually updated to include the field because
			// it was not currently available at the time of writing:
			SecondaryBootDisks:        secondaryDisks,
			MachineType:               machineType,
			ReservationAffinity:       reservation,
			Labels:                    labels,
			Spot:                      spot,
			Taints:                    taints,
			BootDiskKmsKey:            g.ClusterContext.NodeBootDiskKMSKey,
			DiskType:                  diskType,
			EnableConfidentialStorage: g.ClusterContext.NodeConfidentialStorage,
		},
		InitialNodeCount: int64(nodeCount),
		Locations:        []string{g.ClusterContext.NodeZone},
		PlacementPolicy:  placementPolicy,
		Management: &containerv1beta1.NodeManagement{
			AutoRepair:  true,
			AutoUpgrade: false,
		},
		UpgradeSettings: &containerv1beta1.UpgradeSettings{
			MaxSurge: 1,
		},
		MaxPodsConstraint: &containerv1beta1.MaxPodsConstraint{MaxPodsPerNode: maxPodsPerNode},
		NetworkConfig:     networkConfig,
	}

	hash, err := nodePoolSelectiveHash(np)
	if err != nil {
		return nil, fmt.Errorf("hashing node pool: %w", err)
	}
	np.Config.Labels[LabelNodePoolHash] = hash
	return np, nil
}