internal/controllers/reconciliation/reconstitution.go (121 lines of code) (raw):

package reconciliation import ( "context" "fmt" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/manager" "github.com/Azure/eno/internal/resource" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) // reconstitutionSource implements Eno's concept of "reconstitution": taking partitioned sets of resources // that were generated during synthesis and handling them as individual controller work items for reconciliation. // // It's implemented as an untracked controller that runs as a Source of the reconciliation controller. type reconstitutionSource struct { client client.Client nonCachedReader client.Reader cache *resource.Cache } func newReconstitutionSource(mgr ctrl.Manager) (source.TypedSource[resource.Request], *resource.Cache, error) { var cache resource.Cache return source.TypedFunc[resource.Request](func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[resource.Request]) error { cache.SetQueue(queue) r := &reconstitutionSource{ client: mgr.GetClient(), nonCachedReader: mgr.GetAPIReader(), cache: &cache, } // This controller's queue uses composition name/namespace as its key c, err := controller.NewTypedUnmanaged[reconcile.Request]("reconstitutionController", mgr, controller.TypedOptions[reconcile.Request]{ LogConstructor: manager.NewTypedLogConstructor[*reconcile.Request](mgr, "reconstitutionController"), Reconciler: r, }) if err != nil { return err } err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.Composition{}, &handler.TypedEnqueueRequestForObject[*apiv1.Composition]{})) if err != nil { return err } err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.ResourceSlice{}, handler.TypedEnqueueRequestForOwner[*apiv1.ResourceSlice](mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.Composition{}))) if err != nil { return err } go func() { err := c.Start(ctx) if err != nil { panic(fmt.Sprintf("error while starting reconstitution source controller: %s", err)) } }() return nil }), &cache, nil } func (r *reconstitutionSource) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logr.FromContextOrDiscard(ctx) comp := &apiv1.Composition{} err := r.client.Get(ctx, req.NamespacedName, comp) if errors.IsNotFound(err) { r.cache.Purge(ctx, req.NamespacedName, nil) return ctrl.Result{}, nil } if err != nil { logger.Error(err, "failed to get composition") return ctrl.Result{}, err } logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace) ctx = logr.NewContext(ctx, logger) // The reconciliation controller assumes that the previous synthesis will be loaded first filled, err := r.populateCache(ctx, comp, comp.Status.PreviousSynthesis) if err != nil { logger.Error(err, "failed to process previous state") return ctrl.Result{}, err } if filled { return ctrl.Result{Requeue: true}, nil } filled, err = r.populateCache(ctx, comp, comp.Status.CurrentSynthesis) if err != nil { logger.Error(err, "failed to process current state") return ctrl.Result{}, err } if filled { return ctrl.Result{Requeue: true}, nil } r.cache.Purge(ctx, req.NamespacedName, comp) return ctrl.Result{}, nil } func (r *reconstitutionSource) populateCache(ctx context.Context, comp *apiv1.Composition, synthesis *apiv1.Synthesis) (bool, error) { if synthesis == nil || synthesis.Synthesized == nil { // synthesis is still in progress return false, nil } // The informers cache an abbreviated representation of the resource slices to save memory // We can use them for status but not for spec slices := make([]apiv1.ResourceSlice, len(synthesis.ResourceSlices)) for i, ref := range synthesis.ResourceSlices { slice := apiv1.ResourceSlice{} slice.Name = ref.Name slice.Namespace = comp.Namespace err := r.client.Get(ctx, client.ObjectKeyFromObject(&slice), &slice) if err != nil { return false, client.IgnoreNotFound(fmt.Errorf("unable to get resource slice (cached): %w", err)) } slices[i] = slice } compNSN := types.NamespacedName{Namespace: comp.Namespace, Name: comp.Name} if r.cache.Visit(ctx, comp, synthesis.UUID, slices) { return false, nil } // Get the full resource slices to populate the cache // But don't use the status since it might be ahead of the informer for i, ref := range synthesis.ResourceSlices { slice := apiv1.ResourceSlice{} slice.Name = ref.Name slice.Namespace = comp.Namespace err := r.nonCachedReader.Get(ctx, client.ObjectKeyFromObject(&slice), &slice) if err != nil { return false, client.IgnoreNotFound(fmt.Errorf("unable to get resource slice (no cache): %w", err)) } slices[i] = slice } r.cache.Fill(ctx, compNSN, synthesis.UUID, slices) return true, nil }