func()

in tpu-provisioner/internal/cloud/gke.go [84:165]


func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod, why string) error {
	np, err := g.nodePoolForPod(p)
	if err != nil {
		return fmt.Errorf("determining node pool for pod: %w", err)
	}

	existingNPState, err := g.checkExistingNodePool(context.TODO(), np)
	if err != nil {
		return fmt.Errorf("checking if node pool exists: %w", err)
	}
	log.Info("Checked existing node pool state",
		"nodePoolName", np.Name, "existingNodePoolState", existingNPState.String(),
	)
	switch existingNPState {
	case nodePoolStateNotExists:
		// Create the node pool.
	case nodePoolStateExistsAndMatches:
		return nil
	case nodePoolStateExistsAndNotMatches:
		// Recreate the node pool.
		const why = "existing node pool did not match pod and needed to be recreated"
		if err := g.DeleteNodePool(np.Name, p, why); err != nil {
			return fmt.Errorf("failed to delete node pool: %s: %w", why, err)
		}
		// Allow another reconcile cycle to create the new node pool.
		return ErrNodePoolDeletedToBeRecreated
	case nodePoolStateExistsAndStopping:
		// Node pool is stopping, so we need to wait for it to be deleted before creating a new one.
		return ErrNodePoolStopping
	default:
		return fmt.Errorf("unexpected node pool state: %v", existingNPState)
	}

	req := &containerv1beta1.CreateNodePoolRequest{
		NodePool: np,
		Parent:   g.ClusterContext.ClusterName(),
	}

	// Due to concurrent reconciles, multiple creates for the same
	// Node Pool will occur at the same time. The result is an error:
	// "do: googleapi: Error 400: Cluster is running incompatible operation ..."
	// To avoid a bunch of failed requests, we dedeuplicate here.
	if _, inProgress := g.inProgressCreatesNPName.Load(np.Name); inProgress {
		return fmt.Errorf("creation ongoing for node pool name: %v: %w", np.Name, ErrDuplicateRequest)
	}
	g.inProgressCreatesNPName.Store(np.Name, struct{}{})
	defer g.inProgressCreatesNPName.Delete(np.Name)

	// A restarting JobSet will trigger a new Node Pool creation.
	// The current creation attempt might overlap with the previous one,
	// which could still be ongoing, so we need to deduplicate.
	// This works because job-key remains constant across restarts.
	// NOTE: These checks dont work across controller restarts.
	if jobKey := p.Labels[jobset.JobKey]; jobKey != "" {
		if _, inProgress := g.inProgressCreatesJobKey.Load(jobKey); inProgress {
			return fmt.Errorf("creation ongoing for job-key: %v: %w", jobKey, ErrDuplicateRequest)
		}
		g.inProgressCreatesJobKey.Store(jobKey, struct{}{})
		defer g.inProgressCreatesJobKey.Delete(jobKey)
	}

	// Get JobSet this pod is part of from the pod labels and log it.
	jobSetName := p.Labels[jobset.JobSetNameKey]
	g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationStarted, "Starting creation of Node Pool %s (size = %v) for JobSet %s because %s", np.Name, np.InitialNodeCount, jobSetName, why)
	log.Info(fmt.Sprintf("creating node pool %s for jobset %s", np.Name, jobSetName))

	if err := g.NodePools.Create(context.TODO(), req, OpCallbacks{
		ReqFailure: func(err error) {
			g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Request to create Node Pool %s failed: %v.", np.Name, err)
		},
		OpFailure: func(err error) {
			g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Operation to create Node Pool %s failed: %v.", np.Name, err)
		},
		Success: func() {
			g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationSucceeded, "Successfully created Node Pool %s.", np.Name)
		},
	}); err != nil {
		return err
	}

	return nil
}