func()

in pkg/controllers/instance/garbagecollection/controller.go [51:124]


func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
	ctx = injection.WithControllerName(ctx, "instance.garbagecollection")
	// list all agentpools
	cloudNodeClaims, err := c.cloudProvider.List(ctx)
	if err != nil {
		return reconcile.Result{}, err
	}

	cloudNodeClaims = lo.Filter(cloudNodeClaims, func(nc *v1.NodeClaim, _ int) bool {
		return nc.DeletionTimestamp.IsZero()
	})

	kaitoNodeClaims, err := nodeclaimutil.AllKaitoNodeClaims(ctx, c.kubeClient)
	if err != nil {
		return reconcile.Result{}, err
	}

	clusterNodeClaimNames := sets.New[string](lo.FilterMap(kaitoNodeClaims, func(nc v1.NodeClaim, _ int) (string, bool) {
		return nc.Name, true
	})...)

	// instance's related NodeClaim has been removed, and instance has been created for more than 30s
	// so we need to garbage these leaked cloudprovider instances and nodes.
	deletedCloudProviderInstances := lo.Filter(cloudNodeClaims, func(nc *v1.NodeClaim, _ int) bool {
		if clusterNodeClaimNames.Has(nc.Name) {
			return false
		}

		if !nc.CreationTimestamp.IsZero() {
			// agentpool has been created less than 30 seconds, skip it
			if nc.CreationTimestamp.Time.Add(30 * time.Second).After(time.Now()) {
				return false
			}
		}

		return true
	})
	log.FromContext(ctx).Info("instance garbagecollection status", "garbaged instance count", len(deletedCloudProviderInstances))

	errs := make([]error, len(deletedCloudProviderInstances))
	workqueue.ParallelizeUntil(ctx, 20, len(deletedCloudProviderInstances), func(i int) {
		if err := c.cloudProvider.Delete(ctx, deletedCloudProviderInstances[i]); err != nil {
			log.FromContext(ctx).Error(err, "failed to delete leaked cloudprovider instance", "instance", deletedCloudProviderInstances[i].Name)
			errs[i] = cloudprovider.IgnoreNodeClaimNotFoundError(err)
			return
		}
		log.FromContext(ctx).Info("delete leaked cloudprovider instance successfully", "name", deletedCloudProviderInstances[i].Name)

		if len(deletedCloudProviderInstances[i].Status.ProviderID) != 0 {
			nodes, err := nodeclaimutil.AllNodesForNodeClaim(ctx, c.kubeClient, deletedCloudProviderInstances[i])
			if err != nil {
				errs[i] = err
				return
			}

			subErrs := make([]error, len(nodes))
			for k := range nodes {
				// If we still get the Node, but it's already marked as terminating, we don't need to call Delete again
				if nodes[k].DeletionTimestamp.IsZero() {
					// We delete nodes to trigger the node finalization and deletion flow
					if err := c.kubeClient.Delete(ctx, nodes[k]); client.IgnoreNotFound(err) != nil {
						log.FromContext(ctx).Error(err, "failed to delete leaked node", "node", nodes[k].Name)
						subErrs[k] = err
					} else {
						log.FromContext(ctx).Info("delete leaked node successfully", "name", nodes[k].Name)
					}
				}
			}
			errs[i] = multierr.Combine(subErrs...)
		}
	})

	return reconcile.Result{RequeueAfter: time.Minute * 2}, multierr.Combine(errs...)
}