func newReconstitutionSource()

in internal/controllers/reconciliation/reconstitution.go [32:70]


func newReconstitutionSource(mgr ctrl.Manager) (source.TypedSource[resource.Request], *resource.Cache, error) {
	var cache resource.Cache
	return source.TypedFunc[resource.Request](func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[resource.Request]) error {
		cache.SetQueue(queue)

		r := &reconstitutionSource{
			client:          mgr.GetClient(),
			nonCachedReader: mgr.GetAPIReader(),
			cache:           &cache,
		}

		// This controller's queue uses composition name/namespace as its key
		c, err := controller.NewTypedUnmanaged[reconcile.Request]("reconstitutionController", mgr, controller.TypedOptions[reconcile.Request]{
			LogConstructor: manager.NewTypedLogConstructor[*reconcile.Request](mgr, "reconstitutionController"),
			Reconciler:     r,
		})
		if err != nil {
			return err
		}

		err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.Composition{}, &handler.TypedEnqueueRequestForObject[*apiv1.Composition]{}))
		if err != nil {
			return err
		}
		err = c.Watch(source.TypedKind(mgr.GetCache(), &apiv1.ResourceSlice{}, handler.TypedEnqueueRequestForOwner[*apiv1.ResourceSlice](mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.Composition{})))
		if err != nil {
			return err
		}

		go func() {
			err := c.Start(ctx)
			if err != nil {
				panic(fmt.Sprintf("error while starting reconstitution source controller: %s", err))
			}
		}()

		return nil
	}), &cache, nil
}