in pkg/controllers/instance/garbagecollection/controller.go [51:124]
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "instance.garbagecollection")
// list all agentpools
cloudNodeClaims, err := c.cloudProvider.List(ctx)
if err != nil {
return reconcile.Result{}, err
}
cloudNodeClaims = lo.Filter(cloudNodeClaims, func(nc *v1.NodeClaim, _ int) bool {
return nc.DeletionTimestamp.IsZero()
})
kaitoNodeClaims, err := nodeclaimutil.AllKaitoNodeClaims(ctx, c.kubeClient)
if err != nil {
return reconcile.Result{}, err
}
clusterNodeClaimNames := sets.New[string](lo.FilterMap(kaitoNodeClaims, func(nc v1.NodeClaim, _ int) (string, bool) {
return nc.Name, true
})...)
// instance's related NodeClaim has been removed, and instance has been created for more than 30s
// so we need to garbage these leaked cloudprovider instances and nodes.
deletedCloudProviderInstances := lo.Filter(cloudNodeClaims, func(nc *v1.NodeClaim, _ int) bool {
if clusterNodeClaimNames.Has(nc.Name) {
return false
}
if !nc.CreationTimestamp.IsZero() {
// agentpool has been created less than 30 seconds, skip it
if nc.CreationTimestamp.Time.Add(30 * time.Second).After(time.Now()) {
return false
}
}
return true
})
log.FromContext(ctx).Info("instance garbagecollection status", "garbaged instance count", len(deletedCloudProviderInstances))
errs := make([]error, len(deletedCloudProviderInstances))
workqueue.ParallelizeUntil(ctx, 20, len(deletedCloudProviderInstances), func(i int) {
if err := c.cloudProvider.Delete(ctx, deletedCloudProviderInstances[i]); err != nil {
log.FromContext(ctx).Error(err, "failed to delete leaked cloudprovider instance", "instance", deletedCloudProviderInstances[i].Name)
errs[i] = cloudprovider.IgnoreNodeClaimNotFoundError(err)
return
}
log.FromContext(ctx).Info("delete leaked cloudprovider instance successfully", "name", deletedCloudProviderInstances[i].Name)
if len(deletedCloudProviderInstances[i].Status.ProviderID) != 0 {
nodes, err := nodeclaimutil.AllNodesForNodeClaim(ctx, c.kubeClient, deletedCloudProviderInstances[i])
if err != nil {
errs[i] = err
return
}
subErrs := make([]error, len(nodes))
for k := range nodes {
// If we still get the Node, but it's already marked as terminating, we don't need to call Delete again
if nodes[k].DeletionTimestamp.IsZero() {
// We delete nodes to trigger the node finalization and deletion flow
if err := c.kubeClient.Delete(ctx, nodes[k]); client.IgnoreNotFound(err) != nil {
log.FromContext(ctx).Error(err, "failed to delete leaked node", "node", nodes[k].Name)
subErrs[k] = err
} else {
log.FromContext(ctx).Info("delete leaked node successfully", "name", nodes[k].Name)
}
}
}
errs[i] = multierr.Combine(subErrs...)
}
})
return reconcile.Result{RequeueAfter: time.Minute * 2}, multierr.Combine(errs...)
}