in internal/controllers/reconciliation/reconstitution.go [32:70]
func newReconstitutionSource(mgr ctrl.Manager) (source.TypedSource[resource.Request], *resource.Cache, error) {
var cache resource.Cache
return source.TypedFunc[resource.Request](func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[resource.Request]) error {
cache.SetQueue(queue)
r := &reconstitutionSource{
client: mgr.GetClient(),
nonCachedReader: mgr.GetAPIReader(),
cache: &cache,
}
// This controller's queue uses composition name/namespace as its key
c, err := controller.NewTypedUnmanaged[reconcile.Request]("reconstitutionController", mgr, controller.TypedOptions[reconcile.Request]{
LogConstructor: manager.NewTypedLogConstructor[*reconcile.Request](mgr, "reconstitutionController"),
Reconciler: r,
})
if err != nil {
return err
}
err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.Composition{}, &handler.TypedEnqueueRequestForObject[*apiv1.Composition]{}))
if err != nil {
return err
}
err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.ResourceSlice{}, handler.TypedEnqueueRequestForOwner[*apiv1.ResourceSlice](mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.Composition{})))
if err != nil {
return err
}
go func() {
err := c.Start(ctx)
if err != nil {
panic(fmt.Sprintf("error while starting reconstitution source controller: %s", err))
}
}()
return nil
}), &cache, nil
}