in tpu-provisioner/internal/cloud/gke.go [84:165]
func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod, why string) error {
np, err := g.nodePoolForPod(p)
if err != nil {
return fmt.Errorf("determining node pool for pod: %w", err)
}
existingNPState, err := g.checkExistingNodePool(context.TODO(), np)
if err != nil {
return fmt.Errorf("checking if node pool exists: %w", err)
}
log.Info("Checked existing node pool state",
"nodePoolName", np.Name, "existingNodePoolState", existingNPState.String(),
)
switch existingNPState {
case nodePoolStateNotExists:
// Create the node pool.
case nodePoolStateExistsAndMatches:
return nil
case nodePoolStateExistsAndNotMatches:
// Recreate the node pool.
const why = "existing node pool did not match pod and needed to be recreated"
if err := g.DeleteNodePool(np.Name, p, why); err != nil {
return fmt.Errorf("failed to delete node pool: %s: %w", why, err)
}
// Allow another reconcile cycle to create the new node pool.
return ErrNodePoolDeletedToBeRecreated
case nodePoolStateExistsAndStopping:
// Node pool is stopping, so we need to wait for it to be deleted before creating a new one.
return ErrNodePoolStopping
default:
return fmt.Errorf("unexpected node pool state: %v", existingNPState)
}
req := &containerv1beta1.CreateNodePoolRequest{
NodePool: np,
Parent: g.ClusterContext.ClusterName(),
}
// Due to concurrent reconciles, multiple creates for the same
// Node Pool will occur at the same time. The result is an error:
// "do: googleapi: Error 400: Cluster is running incompatible operation ..."
// To avoid a bunch of failed requests, we dedeuplicate here.
if _, inProgress := g.inProgressCreatesNPName.Load(np.Name); inProgress {
return fmt.Errorf("creation ongoing for node pool name: %v: %w", np.Name, ErrDuplicateRequest)
}
g.inProgressCreatesNPName.Store(np.Name, struct{}{})
defer g.inProgressCreatesNPName.Delete(np.Name)
// A restarting JobSet will trigger a new Node Pool creation.
// The current creation attempt might overlap with the previous one,
// which could still be ongoing, so we need to deduplicate.
// This works because job-key remains constant across restarts.
// NOTE: These checks dont work across controller restarts.
if jobKey := p.Labels[jobset.JobKey]; jobKey != "" {
if _, inProgress := g.inProgressCreatesJobKey.Load(jobKey); inProgress {
return fmt.Errorf("creation ongoing for job-key: %v: %w", jobKey, ErrDuplicateRequest)
}
g.inProgressCreatesJobKey.Store(jobKey, struct{}{})
defer g.inProgressCreatesJobKey.Delete(jobKey)
}
// Get JobSet this pod is part of from the pod labels and log it.
jobSetName := p.Labels[jobset.JobSetNameKey]
g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationStarted, "Starting creation of Node Pool %s (size = %v) for JobSet %s because %s", np.Name, np.InitialNodeCount, jobSetName, why)
log.Info(fmt.Sprintf("creating node pool %s for jobset %s", np.Name, jobSetName))
if err := g.NodePools.Create(context.TODO(), req, OpCallbacks{
ReqFailure: func(err error) {
g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Request to create Node Pool %s failed: %v.", np.Name, err)
},
OpFailure: func(err error) {
g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Operation to create Node Pool %s failed: %v.", np.Name, err)
},
Success: func() {
g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationSucceeded, "Successfully created Node Pool %s.", np.Name)
},
}); err != nil {
return err
}
return nil
}