in pkg/controllers/rollout/controller.go [710:801]
func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("rollout-controller")
return runtime.NewControllerManagedBy(mgr).Named("rollout-controller").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
Watches(&fleetv1beta1.ClusterResourceSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceSnapshot create event", "resourceSnapshot", klog.KObj(e.Object))
handleResourceSnapshot(e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceSnapshot generic event", "resourceSnapshot", klog.KObj(e.Object))
handleResourceSnapshot(e.Object, q)
},
}).
Watches(&fleetv1alpha1.ClusterResourceOverrideSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot create event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object))
handleClusterResourceOverrideSnapshot(e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a clusterResourceOverrideSnapshot generic event", "clusterResourceOverrideSnapshot", klog.KObj(e.Object))
handleClusterResourceOverrideSnapshot(e.Object, q)
},
}).
Watches(&fleetv1alpha1.ResourceOverrideSnapshot{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceOverrideSnapshot create event", "resourceOverrideSnapshot", klog.KObj(e.Object))
handleResourceOverrideSnapshot(e.Object, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceOverrideSnapshot generic event", "resourceOverrideSnapshot", klog.KObj(e.Object))
handleResourceOverrideSnapshot(e.Object, q)
},
}).
Watches(&fleetv1alpha1.ClusterResourceOverride{}, handler.Funcs{
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
cro, ok := e.Object.(*fleetv1alpha1.ClusterResourceOverride)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("non ClusterResourceOverride type resource: %+v", e.Object)),
"Rollout controller received invalid ClusterResourceOverride event", "object", klog.KObj(e.Object))
return
}
if cro.Spec.Placement == nil {
return
}
klog.V(2).InfoS("Handling a clusterResourceOverride delete event", "clusterResourceOverride", klog.KObj(cro))
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: cro.Spec.Placement.Name},
})
},
}).
Watches(&fleetv1alpha1.ResourceOverride{}, handler.Funcs{
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
ro, ok := e.Object.(*fleetv1alpha1.ResourceOverride)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("non ResourceOverride type resource: %+v", e.Object)),
"Rollout controller received invalid ResourceOverride event", "object", klog.KObj(e.Object))
return
}
if ro.Spec.Placement == nil {
return
}
klog.V(2).InfoS("Handling a resourceOverride delete event", "resourceOverride", klog.KObj(ro))
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: ro.Spec.Placement.Name},
})
},
}).
Watches(&fleetv1beta1.ClusterResourceBinding{}, handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceBinding create event", "resourceBinding", klog.KObj(e.Object))
enqueueResourceBinding(e.Object, q)
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
handleResourceBindingUpdated(e.ObjectNew, e.ObjectOld, q)
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
klog.V(2).InfoS("Handling a resourceBinding generic event", "resourceBinding", klog.KObj(e.Object))
enqueueResourceBinding(e.Object, q)
},
}).
// Aside from ClusterResourceSnapshot and ClusterResourceBinding objects, the rollout
// controller also watches ClusterResourcePlacement objects, so that it can push apply
// strategy updates to all bindings right away.
Watches(&fleetv1beta1.ClusterResourcePlacement{}, handler.Funcs{
// Ignore all Create, Delete, and Generic events; these do not concern the rollout controller.
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
handleCRP(e.ObjectNew, e.ObjectOld, q)
},
}).
Complete(r)
}