tpu-provisioner/internal/controller/nodepool_garbage_collector.go (77 lines of code) (raw):

package controller import ( "context" "fmt" "time" "github.com/GoogleCloudPlatform/ai-on-gke/tpu-provisioner/internal/cloud" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" ) // NodePoolGarbageCollector deletes node pools that have no Nodes, // are in an errored state, and where the Pod that created the node pool // no longer exists (the deletion reconciler would not see these b/c there // are no Node objects). type NodePoolGarbageCollector struct { Interval time.Duration client.Client Provider cloud.Provider } func (g *NodePoolGarbageCollector) Run(ctx context.Context) { log := ctrllog.Log.WithName("nodepool-garbage-collector") t := time.NewTicker(g.Interval) for { select { case <-ctx.Done(): t.Stop() return case <-t.C: } log.Info("starting node pool garbage collection loop") nodepools, err := g.Provider.ListNodePools() if err != nil { log.Error(err, "failed to list errored node pools") continue } for _, np := range nodepools { log := log.WithValues( "nodepool", np.Name, "createdForJobSetName", np.CreatedForJobSet.Name, "createdForJobSetNamespace", np.CreatedForJobSet.Namespace, ) if !np.Error { continue } if np.CreatedForJobSet.Name == "" || np.CreatedForJobSet.Namespace == "" { log.Info("skipping garbage collection of node pool, no JobSet reference") continue } // Check if the Pod that triggered the Node Pool creation still exists. err := g.Get(ctx, np.CreatedForJobSet, &jobset.JobSet{}) if err == nil { log.Info("skipping garbage collection of node pool, JobSet still exists", "jobSetName", np.CreatedForJobSet.Name, "jobSetNamespace", np.CreatedForJobSet.Namespace, ) continue } if client.IgnoreNotFound(err) != nil { log.Error(err, "failed to get pod node pool was created for") continue } // Pod not found if this point is reached. // Ignore node pools that have Nodes registered for them (these will be handled by the deletion controller). var nodes v1.NodeList if err := g.List(ctx, &nodes, client.MatchingLabels{g.Provider.NodePoolLabelKey(): np.Name}); err != nil { log.Error(err, "failed to list nodes for node pool") continue } if len(nodes.Items) > 0 { log.Info("skipping garbage collection of node pool, nodes exist") continue } log.Info("garbage collecting node pool in error state") // TODO: Lookup namespace from env with downward API. whyDelete := fmt.Sprintf("the node pool has no corresponding Nodes, the Pod (%s/%s) that triggered its creation no longer exists, and node pool is in an error state: %s", np.CreatedForJobSet.Namespace, np.CreatedForJobSet.Name, np.Message) if err := g.Provider.DeleteNodePool(np.Name, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "tpu-provisioner-system"}}, whyDelete); err != nil { log.Error(err, "failed to garbage collect node pool") continue } } } }