pkg/controller/common/resource_reconciler.go (78 lines of code) (raw):
package common
import (
"context"
"time"
"github.com/Azure/aks-app-routing-operator/pkg/controller/controllername"
"github.com/Azure/aks-app-routing-operator/pkg/controller/metrics"
"github.com/Azure/aks-app-routing-operator/pkg/util"
"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type resourceReconciler struct {
name controllername.ControllerNamer
client client.Client
logger logr.Logger
interval, retryInterval time.Duration
resources []client.Object
}
// NewResourceReconciler creates a reconciler that continuously ensures that the provided resources are provisioned
func NewResourceReconciler(manager ctrl.Manager, name controllername.ControllerNamer, resources []client.Object, reconcileInterval time.Duration) error {
metrics.InitControllerMetrics(name)
rr := &resourceReconciler{
name: name,
client: manager.GetClient(),
logger: name.AddToLogger(manager.GetLogger()),
interval: reconcileInterval,
retryInterval: time.Second,
resources: resources,
}
return manager.Add(rr)
}
func (r *resourceReconciler) Start(ctx context.Context) error {
r.logger.Info("starting resource reconciler")
defer r.logger.Info("stopping resource reconciler")
interval := time.Nanosecond // run immediately when starting up
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(util.Jitter(interval, 0.3)):
}
if err := r.tick(ctx); err != nil {
r.logger.Error(err, "reconciling resources")
interval = r.retryInterval
continue
}
interval = r.interval
}
}
func (r *resourceReconciler) tick(ctx context.Context) error {
var err error
start := time.Now()
r.logger.Info("starting to reconcile resources")
defer func() {
r.logger.Info("finished reconciling resources", "latencySec", time.Since(start).Seconds())
metrics.HandleControllerReconcileMetrics(r.name, ctrl.Result{}, err)
}()
for _, res := range r.resources {
lgr := r.logger.WithValues("name", res.GetName(), "namespace", res.GetNamespace(), "kind", res.GetObjectKind().GroupVersionKind())
copy := res.DeepCopyObject().(client.Object)
if copy.GetDeletionTimestamp() != nil {
lgr.Info("deleting resource")
if err = r.client.Delete(ctx, copy); err != nil && !k8serrors.IsNotFound(err) {
r.logger.Error(err, "deleting unneeded resources")
}
continue
}
lgr.Info("upserting resource")
if err = util.Upsert(ctx, r.client, copy); err != nil {
r.logger.Error(err, "upserting resources")
return err
}
}
return nil
}
func (r *resourceReconciler) NeedLeaderElection() bool {
return true
}