pkg/controller/common/cleaner.go (162 lines of code) (raw):

package common import ( "context" "errors" "fmt" "math" "time" "github.com/Azure/aks-app-routing-operator/pkg/controller/controllername" "github.com/Azure/aks-app-routing-operator/pkg/controller/metrics" "github.com/go-logr/logr" "github.com/hashicorp/go-multierror" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) type cleaner struct { name controllername.ControllerNamer mapper meta.RESTMapper clientset kubernetes.Interface dynamic dynamic.Interface logger logr.Logger retriever CleanTypeRetriever // gets the types of resources that will be cleaned maxRetries int } // NewCleaner creates a cleaner that attempts to delete resources with the labels specified and of the types returned by CleanTypeRetriever func NewCleaner(manager ctrl.Manager, name controllername.ControllerNamer, gvrRetriever CleanTypeRetriever) error { d, err := dynamic.NewForConfig(manager.GetConfig()) if err != nil { return fmt.Errorf("creating dynamic client: %w", err) } cs, err := kubernetes.NewForConfig(manager.GetConfig()) if err != nil { return fmt.Errorf("creating clientset: %w", err) } mapper, err := apiutil.NewDynamicRESTMapper(manager.GetConfig(), manager.GetHTTPClient()) if err != nil { return fmt.Errorf("creating dynamic rest mapper: %w", err) } c := &cleaner{ name: name, mapper: mapper, dynamic: d, logger: name.AddToLogger(manager.GetLogger()), clientset: cs, retriever: gvrRetriever, maxRetries: 2, } metrics.InitControllerMetrics(name) return manager.Add(c) } func (c *cleaner) Start(ctx context.Context) error { start := time.Now() c.logger.Info("starting to clean resources") defer func() { c.logger.Info("finished cleaning resources", "latencySec", time.Since(start).Seconds()) }() for i := 0; i <= c.maxRetries; i++ { err := c.Clean(ctx) if err == nil { return nil } c.logger.Error(err, "failed to clean resources", "try", i, "maxTries", c.maxRetries) if i == c.maxRetries { break // failing to clean up unused resources shouldn't crash operator } timeout := time.Duration(int(math.Pow(2, float64(i)))) * time.Second c.logger.Info("sleeping", "time", timeout) time.Sleep(timeout) } return nil } func (c *cleaner) Clean(ctx context.Context) error { var err error defer func() { //placing this call inside a closure allows for result and err to be bound after tick executes //this makes sure they have the proper value //just calling defer metrics.HandleControllerReconcileMetrics(controllerName, result, err) would bind //the values of result and err to their zero values, since they were just instantiated metrics.HandleControllerReconcileMetrics(c.name, ctrl.Result{}, err) }() if c.retriever == nil { err = errors.New("retriever is nil") return err } types, err := c.retriever(c.mapper) if err != nil { err = fmt.Errorf("retrieving gvr types: %w", err) return err } var result *multierror.Error for _, t := range types { if cleanTypeErr := c.CleanType(ctx, t); cleanTypeErr != nil { result = multierror.Append(result, fmt.Errorf("cleaning type %s with labels %s: %w", t.gvr.String(), t.labels, cleanTypeErr)) } } err = result.ErrorOrNil() return err } func (c *cleaner) CleanType(ctx context.Context, t cleanType) error { l := labels.Set(t.labels) selector, err := l.AsValidatedSelector() if err != nil { return fmt.Errorf("validating label selector: %w", err) } listOpt := metav1.ListOptions{ LabelSelector: selector.String(), } c.logger.Info("cleaning type", "type", t.gvr.String(), "selector", selector.String()) dclient := c.dynamic.Resource(t.gvr) err = dclient.DeleteCollection(ctx, metav1.DeleteOptions{}, listOpt) if err == nil { return nil } if !k8serrors.IsMethodNotSupported(err) { return fmt.Errorf("deleting collection %s", t.gvr.String()) } // delete collection is not supported for some types. // instead we list then delete one by one list, err := dclient.List(ctx, listOpt) if err != nil { return fmt.Errorf("listing %s", t.gvr.String()) } if err := list.EachListItem(func(obj runtime.Object) error { isNamespaced, err := isNamespaced(c.clientset, t.gvr) if err != nil { return fmt.Errorf("checking if namespaced: %w", err) } o, err := meta.Accessor(obj) if err != nil { return fmt.Errorf("accessing object metadata: %w", err) } var nsClient dynamic.ResourceInterface = dclient if isNamespaced { nsClient = dclient.Namespace(o.GetNamespace()) } err = nsClient.Delete(ctx, o.GetName(), metav1.DeleteOptions{}) if err != nil && !k8serrors.IsNotFound(err) { return fmt.Errorf("deleting object %s in %s: %w", o.GetName(), o.GetNamespace(), err) } return nil }); err != nil { return fmt.Errorf("deleting each object: %w", err) } return nil } func (c *cleaner) NeedLeaderElection() bool { return true } func isNamespaced(clientset kubernetes.Interface, gvr schema.GroupVersionResource) (bool, error) { res, err := clientset.Discovery().ServerResourcesForGroupVersion(gvr.GroupVersion().String()) if err != nil { return false, fmt.Errorf("getting server resources for group version: %w", err) } namespaced := false for _, r := range res.APIResources { if r.Name == gvr.Resource { namespaced = r.Namespaced break } } return namespaced, nil }