func()

in pkg/controllers/workgenerator/controller.go [1487:1585]


func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
	r.recorder = mgr.GetEventRecorderFor("work generator")
	return controllerruntime.NewControllerManagedBy(mgr).Named("work-generator").
		WithOptions(ctrl.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). // set the max number of concurrent reconciles
		For(&fleetv1beta1.ClusterResourceBinding{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
		Watches(&fleetv1beta1.Work{}, &handler.Funcs{
			// we care about work delete event as we want to know when a work is deleted so that we can
			// delete the corresponding resource binding fast.
			DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
				if evt.Object == nil {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("deleteEvent %v received with no metadata", evt)),
						"Failed to process a delete event for work object")
					return
				}
				parentBindingName, exist := evt.Object.GetLabels()[fleetv1beta1.ParentBindingLabel]
				if !exist {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("deleted work has no binding parent")),
						"Could not find the parent binding label", "deleted work", evt.Object, "existing label", evt.Object.GetLabels())
					return
				}
				// Make sure the work is not deleted behind our back
				klog.V(2).InfoS("Received a work delete event", "work", klog.KObj(evt.Object), "parentBindingName", parentBindingName)
				queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
					Name: parentBindingName,
				}})
			},
			// we care about work update event as we want to know when a work is applied so that we can
			// update the corresponding resource binding status fast.
			UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
				if evt.ObjectOld == nil || evt.ObjectNew == nil {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("updateEvent %v received with no metadata", evt)),
						"Failed to process an update event for work object")
					return
				}
				parentBindingName, exist := evt.ObjectNew.GetLabels()[fleetv1beta1.ParentBindingLabel]
				if !exist {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("work has no binding parent")),
						"Could not find the parent binding label", "updatedWork", evt.ObjectNew, "existing label", evt.ObjectNew.GetLabels())
					return
				}
				oldWork, ok := evt.ObjectOld.(*fleetv1beta1.Work)
				if !ok {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received old object %v not a work object", evt.ObjectOld)),
						"Failed to process an update event for work object")
					return
				}
				newWork, ok := evt.ObjectNew.(*fleetv1beta1.Work)
				if !ok {
					klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("received new object %v not a work object", evt.ObjectNew)),
						"Failed to process an update event for work object")
					return
				}

				if !equality.Semantic.DeepEqual(oldWork.Status, newWork.Status) {
					klog.V(2).InfoS("Work status has been changed", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
				} else {
					oldResourceSnapshot := oldWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
					newResourceSnapshot := newWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
					if oldResourceSnapshot == "" || newResourceSnapshot == "" {
						klog.ErrorS(controller.NewUnexpectedBehaviorError(errors.New("found an invalid work without parent-resource-snapshot-index")),
							"Could not find the parent resource snapshot index label", "oldWork", klog.KObj(oldWork), "oldWorkLabels", oldWork.Labels,
							"newWork", klog.KObj(newWork), "newWorkLabels", newWork.Labels)
						return
					}
					oldClusterResourceOverrideSnapshotHash := oldWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
					newClusterResourceOverrideSnapshotHash := newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
					oldResourceOverrideSnapshotHash := oldWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
					newResourceOverrideSnapshotHash := newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
					if oldClusterResourceOverrideSnapshotHash == "" || newClusterResourceOverrideSnapshotHash == "" ||
						oldResourceOverrideSnapshotHash == "" || newResourceOverrideSnapshotHash == "" {
						klog.ErrorS(controller.NewUnexpectedBehaviorError(errors.New("found an invalid work without override-snapshot-hash")),
							"Could not find the override-snapshot-hash annotation", "oldWork", klog.KObj(oldWork), "oldWorkAnnotations", oldWork.Annotations,
							"newWork", klog.KObj(newWork), "newWorkAnnotations", newWork.Annotations)
						return
					}

					// There is an edge case that, the work spec is the same but from different resourceSnapshots or resourceOverrideSnapshots.
					// WorkGenerator will update the work because of the label/annotation changes, but the generation is the same.
					// When the override update happens, the rollout controller will set the applied condition as false
					// and wait for the workGenerator to update it. The workGenerator will wait for the work status change,
					// but here the status didn't change as the work's spec didn't change
					// In this edge case, we need to requeue the binding to update the binding status.
					if oldResourceSnapshot == newResourceSnapshot &&
						oldClusterResourceOverrideSnapshotHash == newClusterResourceOverrideSnapshotHash &&
						oldResourceOverrideSnapshotHash == newResourceOverrideSnapshotHash {
						klog.V(2).InfoS("The work applied or available condition stayed as true, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
						return
					}
				}

				// We need to update the binding status in this case
				klog.V(2).InfoS("Received a work update event that we need to handle", "work", klog.KObj(newWork), "parentBindingName", parentBindingName)
				queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
					Name: parentBindingName,
				}})
			},
		}).
		Complete(r)
}