pkg/controller/common/resource_reconciler.go (78 lines of code) (raw):

package common import ( "context" "time" "github.com/Azure/aks-app-routing-operator/pkg/controller/controllername" "github.com/Azure/aks-app-routing-operator/pkg/controller/metrics" "github.com/Azure/aks-app-routing-operator/pkg/util" "github.com/go-logr/logr" k8serrors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) type resourceReconciler struct { name controllername.ControllerNamer client client.Client logger logr.Logger interval, retryInterval time.Duration resources []client.Object } // NewResourceReconciler creates a reconciler that continuously ensures that the provided resources are provisioned func NewResourceReconciler(manager ctrl.Manager, name controllername.ControllerNamer, resources []client.Object, reconcileInterval time.Duration) error { metrics.InitControllerMetrics(name) rr := &resourceReconciler{ name: name, client: manager.GetClient(), logger: name.AddToLogger(manager.GetLogger()), interval: reconcileInterval, retryInterval: time.Second, resources: resources, } return manager.Add(rr) } func (r *resourceReconciler) Start(ctx context.Context) error { r.logger.Info("starting resource reconciler") defer r.logger.Info("stopping resource reconciler") interval := time.Nanosecond // run immediately when starting up for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(util.Jitter(interval, 0.3)): } if err := r.tick(ctx); err != nil { r.logger.Error(err, "reconciling resources") interval = r.retryInterval continue } interval = r.interval } } func (r *resourceReconciler) tick(ctx context.Context) error { var err error start := time.Now() r.logger.Info("starting to reconcile resources") defer func() { r.logger.Info("finished reconciling resources", "latencySec", time.Since(start).Seconds()) metrics.HandleControllerReconcileMetrics(r.name, ctrl.Result{}, err) }() for _, res := range r.resources { lgr := r.logger.WithValues("name", res.GetName(), "namespace", res.GetNamespace(), "kind", res.GetObjectKind().GroupVersionKind()) copy := res.DeepCopyObject().(client.Object) if copy.GetDeletionTimestamp() != nil { lgr.Info("deleting resource") if err = r.client.Delete(ctx, copy); err != nil && !k8serrors.IsNotFound(err) { r.logger.Error(err, "deleting unneeded resources") } continue } lgr.Info("upserting resource") if err = util.Upsert(ctx, r.client, copy); err != nil { r.logger.Error(err, "upserting resources") return err } } return nil } func (r *resourceReconciler) NeedLeaderElection() bool { return true }