func()

in pkg/controllers/rollout/controller.go [66:227]


func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtime.Result, error) {
	startTime := time.Now()
	crpName := req.NamespacedName.Name
	klog.V(2).InfoS("Start to rollout the bindings", "clusterResourcePlacement", crpName)

	// add latency log
	defer func() {
		klog.V(2).InfoS("Rollout reconciliation loop ends", "clusterResourcePlacement", crpName, "latency", time.Since(startTime).Milliseconds())
	}()

	// Get the cluster resource placement
	crp := fleetv1beta1.ClusterResourcePlacement{}
	if err := r.Client.Get(ctx, client.ObjectKey{Name: crpName}, &crp); err != nil {
		if errors.IsNotFound(err) {
			klog.V(4).InfoS("Ignoring NotFound clusterResourcePlacement", "clusterResourcePlacement", crpName)
			return runtime.Result{}, nil
		}
		klog.ErrorS(err, "Failed to get clusterResourcePlacement", "clusterResourcePlacement", crpName)
		return runtime.Result{}, controller.NewAPIServerError(true, err)
	}
	// check that the crp is not being deleted
	if crp.DeletionTimestamp != nil {
		klog.V(2).InfoS("Ignoring clusterResourcePlacement that is being deleted", "clusterResourcePlacement", crpName)
		return runtime.Result{}, nil
	}

	// check that it's actually rollingUpdate strategy
	// TODO: support the rollout all at once type of RolloutStrategy
	if crp.Spec.Strategy.Type != fleetv1beta1.RollingUpdateRolloutStrategyType {
		klog.V(2).InfoS("Ignoring clusterResourcePlacement with non-rolling-update strategy", "clusterResourcePlacement", crpName)
		return runtime.Result{}, nil
	}

	// list all the bindings associated with the clusterResourcePlacement
	// we read from the API server directly to avoid the repeated reconcile loop due to cache inconsistency
	bindingList := &fleetv1beta1.ClusterResourceBindingList{}
	crpLabelMatcher := client.MatchingLabels{
		fleetv1beta1.CRPTrackingLabel: crp.Name,
	}
	if err := r.UncachedReader.List(ctx, bindingList, crpLabelMatcher); err != nil {
		klog.ErrorS(err, "Failed to list all the bindings associated with the clusterResourcePlacement",
			"clusterResourcePlacement", crpName)
		return runtime.Result{}, controller.NewAPIServerError(false, err)
	}
	// take a deep copy of the bindings so that we can safely modify them
	allBindings := make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindingList.Items))
	for _, binding := range bindingList.Items {
		allBindings = append(allBindings, binding.DeepCopy())
	}

	// Process apply strategy updates (if any). This runs independently of the rollout process.
	//
	// Apply strategy changes will be immediately applied to all bindings that have not been
	// marked for deletion yet. Note that even unscheduled bindings will receive this update;
	// as apply strategy changes might have an effect on its Applied and Available status, and
	// consequently on the rollout progress.
	applyStrategyUpdated, err := r.processApplyStrategyUpdates(ctx, &crp, allBindings)
	switch {
	case err != nil:
		klog.ErrorS(err, "Failed to process apply strategy updates", "clusterResourcePlacement", crpName)
		return runtime.Result{}, err
	case applyStrategyUpdated:
		// After the apply strategy is updated (a spec change), all status conditions on the
		// ClusterResourceBinding object will become stale. To simplify the workflow of
		// the rollout controller, Fleet will requeue the request now, and let the subsequent
		// reconciliation loop to handle the status condition refreshing.
		//
		// Note that work generator will skip processing ClusterResourceBindings with stale
		// RolloutStarted conditions.
		klog.V(2).InfoS("Apply strategy has been updated; requeue the request", "clusterResourcePlacement", crpName)
		return reconcile.Result{Requeue: true}, nil
	default:
		klog.V(2).InfoS("Apply strategy is up to date on all bindings; continue with the rollout process", "clusterResourcePlacement", crpName)
	}

	// handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not completely deleted yet
	wait, err := waitForResourcesToCleanUp(allBindings, &crp)
	if err != nil {
		return runtime.Result{}, err
	}
	if wait {
		// wait for the deletion to finish
		klog.V(2).InfoS("Found multiple bindings pointing to the same cluster, wait for the deletion to finish", "clusterResourcePlacement", crpName)
		return runtime.Result{RequeueAfter: 5 * time.Second}, nil
	}

	// find the latest clusterResourceSnapshot.
	latestResourceSnapshot, err := r.fetchLatestResourceSnapshot(ctx, crpName)
	if err != nil {
		klog.ErrorS(err, "Failed to find the latest clusterResourceSnapshot for the clusterResourcePlacement",
			"clusterResourcePlacement", crpName)
		return runtime.Result{}, err
	}
	klog.V(2).InfoS("Found the latest resourceSnapshot for the clusterResourcePlacement", "clusterResourcePlacement", crpName, "latestResourceSnapshot", klog.KObj(latestResourceSnapshot))

	// fill out all the default values for CRP just in case the mutation webhook is not enabled.
	defaulter.SetDefaultsClusterResourcePlacement(&crp)
	// Note: there is a corner case that an override is in-between snapshots (the old one is marked as not the latest while the new one is not created yet)
	// This will result in one of the override is removed by the rollout controller so the first instance of the updated cluster can experience
	// a complete removal of the override effect following by applying the new override effect.
	// TODO: detect this situation in the FetchAllMatchingOverridesForResourceSnapshot and retry here
	matchedCRO, matchedRO, err := overrider.FetchAllMatchingOverridesForResourceSnapshot(ctx, r.Client, r.InformerManager, crp.Name, latestResourceSnapshot)
	if err != nil {
		klog.ErrorS(err, "Failed to find all matching overrides for the clusterResourcePlacement", "clusterResourcePlacement", crpName)
		return runtime.Result{}, err
	}

	// pick the bindings to be updated according to the rollout plan
	// staleBoundBindings is a list of "Bound" bindings and are not selected in this round because of the rollout strategy.
	toBeUpdatedBindings, staleBoundBindings, upToDateBoundBindings, needRoll, waitTime, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
	if err != nil {
		klog.ErrorS(err, "Failed to pick the bindings to roll", "clusterResourcePlacement", crpName)
		return runtime.Result{}, err
	}

	if !needRoll {
		klog.V(2).InfoS("No bindings are out of date, stop rolling", "clusterResourcePlacement", crpName)
		// There is a corner case that rollout controller succeeds to update the binding spec to the latest one,
		// but fails to update the binding conditions when it reconciled it last time.
		// Here it will correct the binding status just in case this happens last time.
		return runtime.Result{}, r.checkAndUpdateStaleBindingsStatus(ctx, allBindings)
	}
	klog.V(2).InfoS("Picked the bindings to be updated",
		"clusterResourcePlacement", crpName,
		"numberOfToBeUpdatedBindings", len(toBeUpdatedBindings),
		"numberOfStaleBindings", len(staleBoundBindings),
		"numberOfUpToDateBindings", len(upToDateBoundBindings))

	// StaleBindings is the list that contains bindings that need to be updated (binding to a
	// cluster, upgrading to a newer resource/override snapshot) but are blocked by
	// the rollout strategy.
	//
	// Note that Fleet does not consider unscheduled bindings as stale bindings, even if the
	// status conditions on them have become stale (the work generator will handle them as an
	// exception).
	//
	// TO-DO (chenyu1): evaluate how we could improve the flow to reduce coupling.
	//
	// Update the status first, so that if the rolling out (updateBindings func) fails in the
	// middle, the controller will recompute the list so the rollout can move forward.
	if err := r.updateStaleBindingsStatus(ctx, staleBoundBindings); err != nil {
		return runtime.Result{}, err
	}
	klog.V(2).InfoS("Successfully updated status of the stale bindings", "clusterResourcePlacement", crpName, "numberOfStaleBindings", len(staleBoundBindings))

	// upToDateBoundBindings contains all the ClusterResourceBindings that does not need to have
	// their resource/override snapshots updated, but might need to have their status updated.
	//
	// Bindings might have up to date resource/override snapshots but stale status information when
	// an apply strategy update has just been applied, or an error has occurred during the
	// previous rollout process (specifically after the spec update but before the status update).
	if err := r.refreshUpToDateBindingStatus(ctx, upToDateBoundBindings); err != nil {
		return runtime.Result{}, err
	}
	klog.V(2).InfoS("Successfully updated status of the up-to-date bindings", "clusterResourcePlacement", crpName, "numberOfUpToDateBindings", len(upToDateBoundBindings))

	// Update all the bindings in parallel according to the rollout plan.
	// We need to requeue the request regardless if the binding updates succeed or not
	// to avoid the case that the rollout process stalling because the time based binding readiness does not trigger any event.
	// Wait the time we need to wait for the first applied but not ready binding to be ready
	return runtime.Result{Requeue: true, RequeueAfter: waitTime}, r.updateBindings(ctx, toBeUpdatedBindings)
}