internal/controllers/reconciliation/controller.go (306 lines of code) (raw):
package reconciliation
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/flowcontrol"
"github.com/Azure/eno/internal/manager"
"github.com/Azure/eno/internal/resource"
"github.com/go-logr/logr"
)
type Options struct {
Manager ctrl.Manager
WriteBuffer *flowcontrol.ResourceSliceWriteBuffer
Downstream *rest.Config
DisableServerSideApply bool
Timeout time.Duration
ReadinessPollInterval time.Duration
MinReconcileInterval time.Duration
}
type Controller struct {
client client.Client
writeBuffer *flowcontrol.ResourceSliceWriteBuffer
resourceClient *resource.Cache
timeout time.Duration
readinessPollInterval time.Duration
upstreamClient client.Client
minReconcileInterval time.Duration
disableSSA bool
}
func New(mgr ctrl.Manager, opts Options) error {
upstreamClient, err := client.New(opts.Downstream, client.Options{
Scheme: runtime.NewScheme(), // empty scheme since we shouldn't rely on compile-time types
})
if err != nil {
return err
}
src, cache, err := newReconstitutionSource(mgr)
if err != nil {
return err
}
c := &Controller{
client: opts.Manager.GetClient(),
writeBuffer: opts.WriteBuffer,
resourceClient: cache,
timeout: opts.Timeout,
readinessPollInterval: opts.ReadinessPollInterval,
upstreamClient: upstreamClient,
minReconcileInterval: opts.MinReconcileInterval,
disableSSA: opts.DisableServerSideApply,
}
return builder.TypedControllerManagedBy[resource.Request](mgr).
Named("reconciliationController").
WithLogConstructor(manager.NewTypedLogConstructor[*resource.Request](mgr, "reconciliationController")).
WithOptions(controller.TypedOptions[resource.Request]{
// Since this controller uses requeues as feedback instead of watches, the default
// rate limiter's global 10 RPS token bucket quickly becomes a bottleneck.
//
// This rate limiter uses the same per-item rate limiter as the default, but without
// the additional shared/global/non-item-scoped limiter.
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[resource.Request](5*time.Millisecond, 1000*time.Second),
}).
WatchesRawSource(src).
Complete(c)
}
func (c *Controller) Reconcile(ctx context.Context, req resource.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
comp := &apiv1.Composition{}
err := c.client.Get(ctx, types.NamespacedName{Name: req.Composition.Name, Namespace: req.Composition.Namespace}, comp)
if err != nil {
logger.Error(err, "failed to get composition")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
synthesisUUID := comp.Status.GetCurrentSynthesisUUID()
logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation, "synthesisUUID", synthesisUUID)
if comp.Status.CurrentSynthesis == nil {
return ctrl.Result{}, nil // nothing to do
}
logger = logger.WithValues("synthesizerName", comp.Spec.Synthesizer.Name, "synthesizerGeneration", comp.Status.CurrentSynthesis.ObservedSynthesizerGeneration, "synthesisUUID", comp.Status.GetCurrentSynthesisUUID())
ctx = logr.NewContext(ctx, logger)
// Find the current and (optionally) previous desired states in the cache
var prev *resource.Resource
resource, visible, exists := c.resourceClient.Get(ctx, synthesisUUID, req.Resource)
if !exists || !visible {
return ctrl.Result{}, nil
}
logger = logger.WithValues("resourceKind", resource.Ref.Kind, "resourceName", resource.Ref.Name, "resourceNamespace", resource.Ref.Namespace)
ctx = logr.NewContext(ctx, logger)
if syn := comp.Status.PreviousSynthesis; syn != nil {
prev, _, _ = c.resourceClient.Get(ctx, syn.UUID, req.Resource)
}
// Fetch the current resource
current, err := c.getCurrent(ctx, resource)
if client.IgnoreNotFound(err) != nil && !isErrMissingNS(err) && !isErrNoKindMatch(err) {
logger.Error(err, "failed to get current state")
return ctrl.Result{}, err
}
// Evaluate resource readiness
// - Readiness checks are skipped when this version of the resource's desired state has already become ready
// - Readiness checks are skipped when the resource hasn't changed since the last check
// - Readiness defaults to true if no checks are given
var ready *metav1.Time
status := resource.State()
if status == nil || status.Ready == nil {
readiness, ok := resource.ReadinessChecks.EvalOptionally(ctx, current)
if ok {
ready = &readiness.ReadyTime
}
} else {
ready = status.Ready
}
modified, err := c.reconcileResource(ctx, comp, prev, resource, current)
if err != nil {
logger.Error(err, "failed to reconcile resource")
return ctrl.Result{}, err
}
if modified {
return ctrl.Result{Requeue: true}, nil
}
deleted := current == nil ||
current.GetDeletionTimestamp() != nil ||
(resource.Deleted(comp) && comp.ShouldOrphanResources()) // orphaning should be reflected on the status.
c.writeBuffer.PatchStatusAsync(ctx, &resource.ManifestRef, patchResourceState(deleted, ready))
return c.requeue(logger, comp, resource, ready)
}
func (c *Controller) reconcileResource(ctx context.Context, comp *apiv1.Composition, prev, res *resource.Resource, current *unstructured.Unstructured) (bool, error) {
logger := logr.FromContextOrDiscard(ctx)
start := time.Now()
defer func() {
reconciliationLatency.Observe(float64(time.Since(start).Milliseconds()))
}()
if res.Deleted(comp) {
if current == nil || current.GetDeletionTimestamp() != nil {
return false, nil // already deleted - nothing to do
}
reconciliationActions.WithLabelValues("delete").Inc()
err := c.upstreamClient.Delete(ctx, current)
if err != nil {
return true, client.IgnoreNotFound(fmt.Errorf("deleting resource: %w", err))
}
logger.V(0).Info("deleted resource")
return true, nil
}
if res.Patch != nil && current == nil {
logger.V(1).Info("resource doesn't exist - skipping patch")
return false, nil
}
// Create the resource when it doesn't exist, should exist, and wouldn't be created later by server-side apply
if current == nil && (res.DisableUpdates || res.Replace || c.disableSSA) {
reconciliationActions.WithLabelValues("create").Inc()
err := c.upstreamClient.Create(ctx, res.Unstructured())
if err != nil {
return false, fmt.Errorf("creating resource: %w", err)
}
logger.V(0).Info("created resource")
return true, nil
}
if res.DisableUpdates {
return false, nil
}
// Apply Eno patches
if res.Patch != nil {
if !res.NeedsToBePatched(current) {
return false, nil
}
patch, err := json.Marshal(&res.Patch)
if err != nil {
return false, fmt.Errorf("encoding json patch: %w", err)
}
reconciliationActions.WithLabelValues("patch").Inc()
err = c.upstreamClient.Patch(ctx, current, client.RawPatch(types.JSONPatchType, patch))
if err != nil {
return false, fmt.Errorf("applying patch: %w", err)
}
logger.V(0).Info("patched resource", "resourceVersion", current.GetResourceVersion())
return true, nil
}
// Dry-run the update to see if it's needed
dryRun, err := c.update(ctx, res, current, true)
if err != nil {
return false, fmt.Errorf("dry-run applying update: %w", err)
}
if resource.Compare(dryRun, current) {
return false, nil // in sync
}
// When using server side apply, make sure we haven't lost any managedFields metadata.
// Eno should always remove fields that are no longer set by the synthesizer, even if another client messed with managedFields.
if current != nil && !c.disableSSA && !res.Replace {
var dryRunPrev *unstructured.Unstructured
if prev != nil {
dryRunPrev = prev.Unstructured()
err = c.upstreamClient.Patch(ctx, dryRunPrev, client.Apply, client.ForceOwnership, client.FieldOwner("eno"), client.DryRunAll)
if err != nil {
return false, fmt.Errorf("getting managed fields values for previous version: %w", err)
}
}
outOfSyncWithPrevious := dryRunPrev != nil && !resource.CompareEnoManagedFields(dryRunPrev.GetManagedFields(), current.GetManagedFields())
outOfSyncWithCurrent := !resource.CompareEnoManagedFields(dryRun.GetManagedFields(), current.GetManagedFields())
if (outOfSyncWithPrevious && outOfSyncWithCurrent) || (outOfSyncWithCurrent && dryRunPrev == nil) {
if dryRunPrev == nil {
current.SetManagedFields(
resource.MergeEnoManagedFields(
current.GetManagedFields(), dryRun.GetManagedFields()))
} else {
current.SetManagedFields(
resource.MergeEnoManagedFields(
current.GetManagedFields(), dryRunPrev.GetManagedFields()))
}
err := c.upstreamClient.Update(ctx, current, client.FieldOwner("eno"))
if err != nil {
return false, fmt.Errorf("updating managed fields metadata: %w", err)
}
logger.V(0).Info("corrected drift in managed fields metadata")
return true, nil
}
}
// Do the actual non-dryrun update
reconciliationActions.WithLabelValues("apply").Inc()
updated, err := c.update(ctx, res, current, false)
if err != nil {
return false, fmt.Errorf("applying update: %w", err)
}
if current != nil && updated.GetResourceVersion() == current.GetResourceVersion() {
logger.V(0).Info("resource didn't change after update")
return false, nil
}
if current != nil {
logger = logger.WithValues("oldResourceVersion", current.GetResourceVersion())
}
logger.V(0).Info("applied resource", "resourceVersion", updated.GetResourceVersion(), "dryRunResourceVersion", dryRun.GetResourceVersion())
return true, nil
}
func (c *Controller) update(ctx context.Context, resource *resource.Resource, current *unstructured.Unstructured, dryrun bool) (updated *unstructured.Unstructured, err error) {
updated = resource.Unstructured()
if current != nil {
updated.SetResourceVersion(current.GetResourceVersion())
}
if resource.Replace {
opts := []client.UpdateOption{}
if dryrun {
opts = append(opts, client.DryRunAll)
}
err = c.upstreamClient.Update(ctx, updated, opts...)
return
}
opts := []client.PatchOption{}
if dryrun {
opts = append(opts, client.DryRunAll)
}
var patch client.Patch
if c.disableSSA {
patch = client.MergeFrom(current)
} else {
patch = client.Apply
opts = append(opts, client.ForceOwnership, client.FieldOwner("eno"))
}
err = c.upstreamClient.Patch(ctx, updated, patch, opts...)
return
}
func (c *Controller) getCurrent(ctx context.Context, resource *resource.Resource) (*unstructured.Unstructured, error) {
current := &unstructured.Unstructured{}
current.SetName(resource.Ref.Name)
current.SetNamespace(resource.Ref.Namespace)
current.SetKind(resource.GVK.Kind)
current.SetAPIVersion(resource.GVK.GroupVersion().String())
err := c.upstreamClient.Get(ctx, client.ObjectKeyFromObject(current), current)
if err != nil {
return nil, err
}
return current, nil
}
func (c *Controller) requeue(logger logr.Logger, comp *apiv1.Composition, resource *resource.Resource, ready *metav1.Time) (ctrl.Result, error) {
if ready == nil {
return ctrl.Result{RequeueAfter: wait.Jitter(c.readinessPollInterval, 0.1)}, nil
}
if resource == nil || resource.Deleted(comp) || resource.ReconcileInterval == nil {
return ctrl.Result{}, nil
}
interval := resource.ReconcileInterval.Duration
if interval < c.minReconcileInterval {
logger.V(1).Info("reconcile interval is too small - using default", "latency", interval, "default", c.minReconcileInterval)
interval = c.minReconcileInterval
}
return ctrl.Result{RequeueAfter: wait.Jitter(interval, 0.1)}, nil
}
func patchResourceState(deleted bool, ready *metav1.Time) flowcontrol.StatusPatchFn {
return func(rs *apiv1.ResourceState) *apiv1.ResourceState {
if rs != nil && rs.Deleted == deleted && rs.Reconciled && ptr.Deref(rs.Ready, metav1.Time{}) == ptr.Deref(ready, metav1.Time{}) {
return nil
}
return &apiv1.ResourceState{
Deleted: deleted,
Ready: ready,
Reconciled: true,
}
}
}
// isErrMissingNS returns true when given the client-go error returned by mutating requests that do not include a namespace.
// Sadly, this error isn't exposed anywhere - it's just a plain string, so we have to do string matching here.
//
// https://github.com/kubernetes/kubernetes/blob/9edabd617945cd23111fd46cfc9a09fe37ed194a/staging/src/k8s.io/client-go/rest/request.go#L1048
func isErrMissingNS(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "an empty namespace may not be set")
}
func isErrNoKindMatch(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "no matches for kind")
}