in pkg/controllers/workgenerator/controller.go [1487:1585]
func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("work generator")
return controllerruntime.NewControllerManagedBy(mgr).Named("work-generator").
WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
For(&fleetv1beta1.ClusterResourceBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&fleetv1beta1.Work{}, &handler.Funcs{
// we care about work delete event as we want to know when a work is deleted so that we can
// delete the corresponding resource binding fast.
DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if evt.Object == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("deleteEvent %v received with no metadata", evt)),
"Failed to process a delete event for work object")
return
}
parentBindingName, exist := evt.Object.GetLabels()[fleetv1beta1.ParentBindingLabel]
if !exist {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("deleted work has no binding parent")),
"Could not find the parent binding label", "deleted work", evt.Object, "existing label", evt.Object.GetLabels())
return
}
// Make sure the work is not deleted behind our back
klog.V(2).InfoS("Received a work delete event", "work", klog.KObj(evt.Object), "parentBindingName", parentBindingName)
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: parentBindingName,
}})
},
// we care about work update event as we want to know when a work is applied so that we can
// update the corresponding resource binding status fast.
UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if evt.ObjectOld == nil || evt.ObjectNew == nil {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", evt)),
"Failed to process an update event for work object")
return
}
parentBindingName, exist := evt.ObjectNew.GetLabels()[fleetv1beta1.ParentBindingLabel]
if !exist {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("work has no binding parent")),
"Could not find the parent binding label", "updatedWork", evt.ObjectNew, "existing label", evt.ObjectNew.GetLabels())
return
}
oldWork, ok := evt.ObjectOld.(*fleetv1beta1.Work)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a work object", evt.ObjectOld)),
"Failed to process an update event for work object")
return
}
newWork, ok := evt.ObjectNew.(*fleetv1beta1.Work)
if !ok {
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a work object", evt.ObjectNew)),
"Failed to process an update event for work object")
return
}
if !equality.Semantic.DeepEqual(oldWork.Status, newWork.Status) {
klog.V(2).InfoS("Work status has been changed", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
} else {
oldResourceSnapshot := oldWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
newResourceSnapshot := newWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
if oldResourceSnapshot == "" || newResourceSnapshot == "" {
klog.ErrorS(controller.NewUnexpectedBehaviorError(errors.New("found an invalid work without parent-resource-snapshot-index")),
"Could not find the parent resource snapshot index label", "oldWork", klog.KObj(oldWork), "oldWorkLabels", oldWork.Labels,
"newWork", klog.KObj(newWork), "newWorkLabels", newWork.Labels)
return
}
oldClusterResourceOverrideSnapshotHash := oldWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
newClusterResourceOverrideSnapshotHash := newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
oldResourceOverrideSnapshotHash := oldWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
newResourceOverrideSnapshotHash := newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
if oldClusterResourceOverrideSnapshotHash == "" || newClusterResourceOverrideSnapshotHash == "" ||
oldResourceOverrideSnapshotHash == "" || newResourceOverrideSnapshotHash == "" {
klog.ErrorS(controller.NewUnexpectedBehaviorError(errors.New("found an invalid work without override-snapshot-hash")),
"Could not find the override-snapshot-hash annotation", "oldWork", klog.KObj(oldWork), "oldWorkAnnotations", oldWork.Annotations,
"newWork", klog.KObj(newWork), "newWorkAnnotations", newWork.Annotations)
return
}
// There is an edge case that, the work spec is the same but from different resourceSnapshots or resourceOverrideSnapshots.
// WorkGenerator will update the work because of the label/annotation changes, but the generation is the same.
// When the override update happens, the rollout controller will set the applied condition as false
// and wait for the workGenerator to update it. The workGenerator will wait for the work status change,
// but here the status didn't change as the work's spec didn't change
// In this edge case, we need to requeue the binding to update the binding status.
if oldResourceSnapshot == newResourceSnapshot &&
oldClusterResourceOverrideSnapshotHash == newClusterResourceOverrideSnapshotHash &&
oldResourceOverrideSnapshotHash == newResourceOverrideSnapshotHash {
klog.V(2).InfoS("The work applied or available condition stayed as true, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
return
}
}
// We need to update the binding status in this case
klog.V(2).InfoS("Received a work update event that we need to handle", "work", klog.KObj(newWork), "parentBindingName", parentBindingName)
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: parentBindingName,
}})
},
}).
Complete(r)
}