in internal/controllers/reconciliation/controller.go [53:90]
func New(mgr ctrl.Manager, opts Options) error {
upstreamClient, err := client.New(opts.Downstream, client.Options{
Scheme: runtime.NewScheme(), // empty scheme since we shouldn't rely on compile-time types
})
if err != nil {
return err
}
src, cache, err := newReconstitutionSource(mgr)
if err != nil {
return err
}
c := &Controller{
client: opts.Manager.GetClient(),
writeBuffer: opts.WriteBuffer,
resourceClient: cache,
timeout: opts.Timeout,
readinessPollInterval: opts.ReadinessPollInterval,
upstreamClient: upstreamClient,
minReconcileInterval: opts.MinReconcileInterval,
disableSSA: opts.DisableServerSideApply,
}
return builder.TypedControllerManagedBy[resource.Request](mgr).
Named("reconciliationController").
WithLogConstructor(manager.NewTypedLogConstructor[*resource.Request](mgr, "reconciliationController")).
WithOptions(controller.TypedOptions[resource.Request]{
// Since this controller uses requeues as feedback instead of watches, the default
// rate limiter's global 10 RPS token bucket quickly becomes a bottleneck.
//
// This rate limiter uses the same per-item rate limiter as the default, but without
// the additional shared/global/non-item-scoped limiter.
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[resource.Request](5*time.Millisecond, 1000*time.Second),
}).
WatchesRawSource(src).
Complete(c)
}