in pkg/controllers/rollout/controller.go [66:227]
func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtime.Result, error) {
startTime := time.Now()
crpName := req.NamespacedName.Name
klog.V(2).InfoS("Start to rollout the bindings", "clusterResourcePlacement", crpName)
// add latency log
defer func() {
klog.V(2).InfoS("Rollout reconciliation loop ends", "clusterResourcePlacement", crpName, "latency", time.Since(startTime).Milliseconds())
}()
// Get the cluster resource placement
crp := fleetv1beta1.ClusterResourcePlacement{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: crpName}, &crp); err != nil {
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound clusterResourcePlacement", "clusterResourcePlacement", crpName)
return runtime.Result{}, nil
}
klog.ErrorS(err, "Failed to get clusterResourcePlacement", "clusterResourcePlacement", crpName)
return runtime.Result{}, controller.NewAPIServerError(true, err)
}
// check that the crp is not being deleted
if crp.DeletionTimestamp != nil {
klog.V(2).InfoS("Ignoring clusterResourcePlacement that is being deleted", "clusterResourcePlacement", crpName)
return runtime.Result{}, nil
}
// check that it's actually rollingUpdate strategy
// TODO: support the rollout all at once type of RolloutStrategy
if crp.Spec.Strategy.Type != fleetv1beta1.RollingUpdateRolloutStrategyType {
klog.V(2).InfoS("Ignoring clusterResourcePlacement with non-rolling-update strategy", "clusterResourcePlacement", crpName)
return runtime.Result{}, nil
}
// list all the bindings associated with the clusterResourcePlacement
// we read from the API server directly to avoid the repeated reconcile loop due to cache inconsistency
bindingList := &fleetv1beta1.ClusterResourceBindingList{}
crpLabelMatcher := client.MatchingLabels{
fleetv1beta1.CRPTrackingLabel: crp.Name,
}
if err := r.UncachedReader.List(ctx, bindingList, crpLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list all the bindings associated with the clusterResourcePlacement",
"clusterResourcePlacement", crpName)
return runtime.Result{}, controller.NewAPIServerError(false, err)
}
// take a deep copy of the bindings so that we can safely modify them
allBindings := make([]*fleetv1beta1.ClusterResourceBinding, 0, len(bindingList.Items))
for _, binding := range bindingList.Items {
allBindings = append(allBindings, binding.DeepCopy())
}
// Process apply strategy updates (if any). This runs independently of the rollout process.
//
// Apply strategy changes will be immediately applied to all bindings that have not been
// marked for deletion yet. Note that even unscheduled bindings will receive this update;
// as apply strategy changes might have an effect on its Applied and Available status, and
// consequently on the rollout progress.
applyStrategyUpdated, err := r.processApplyStrategyUpdates(ctx, &crp, allBindings)
switch {
case err != nil:
klog.ErrorS(err, "Failed to process apply strategy updates", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
case applyStrategyUpdated:
// After the apply strategy is updated (a spec change), all status conditions on the
// ClusterResourceBinding object will become stale. To simplify the workflow of
// the rollout controller, Fleet will requeue the request now, and let the subsequent
// reconciliation loop to handle the status condition refreshing.
//
// Note that work generator will skip processing ClusterResourceBindings with stale
// RolloutStarted conditions.
klog.V(2).InfoS("Apply strategy has been updated; requeue the request", "clusterResourcePlacement", crpName)
return reconcile.Result{Requeue: true}, nil
default:
klog.V(2).InfoS("Apply strategy is up to date on all bindings; continue with the rollout process", "clusterResourcePlacement", crpName)
}
// handle the case that a cluster was unselected by the scheduler and then selected again but the unselected binding is not completely deleted yet
wait, err := waitForResourcesToCleanUp(allBindings, &crp)
if err != nil {
return runtime.Result{}, err
}
if wait {
// wait for the deletion to finish
klog.V(2).InfoS("Found multiple bindings pointing to the same cluster, wait for the deletion to finish", "clusterResourcePlacement", crpName)
return runtime.Result{RequeueAfter: 5 * time.Second}, nil
}
// find the latest clusterResourceSnapshot.
latestResourceSnapshot, err := r.fetchLatestResourceSnapshot(ctx, crpName)
if err != nil {
klog.ErrorS(err, "Failed to find the latest clusterResourceSnapshot for the clusterResourcePlacement",
"clusterResourcePlacement", crpName)
return runtime.Result{}, err
}
klog.V(2).InfoS("Found the latest resourceSnapshot for the clusterResourcePlacement", "clusterResourcePlacement", crpName, "latestResourceSnapshot", klog.KObj(latestResourceSnapshot))
// fill out all the default values for CRP just in case the mutation webhook is not enabled.
defaulter.SetDefaultsClusterResourcePlacement(&crp)
// Note: there is a corner case that an override is in-between snapshots (the old one is marked as not the latest while the new one is not created yet)
// This will result in one of the override is removed by the rollout controller so the first instance of the updated cluster can experience
// a complete removal of the override effect following by applying the new override effect.
// TODO: detect this situation in the FetchAllMatchingOverridesForResourceSnapshot and retry here
matchedCRO, matchedRO, err := overrider.FetchAllMatchingOverridesForResourceSnapshot(ctx, r.Client, r.InformerManager, crp.Name, latestResourceSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to find all matching overrides for the clusterResourcePlacement", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
}
// pick the bindings to be updated according to the rollout plan
// staleBoundBindings is a list of "Bound" bindings and are not selected in this round because of the rollout strategy.
toBeUpdatedBindings, staleBoundBindings, upToDateBoundBindings, needRoll, waitTime, err := r.pickBindingsToRoll(ctx, allBindings, latestResourceSnapshot, &crp, matchedCRO, matchedRO)
if err != nil {
klog.ErrorS(err, "Failed to pick the bindings to roll", "clusterResourcePlacement", crpName)
return runtime.Result{}, err
}
if !needRoll {
klog.V(2).InfoS("No bindings are out of date, stop rolling", "clusterResourcePlacement", crpName)
// There is a corner case that rollout controller succeeds to update the binding spec to the latest one,
// but fails to update the binding conditions when it reconciled it last time.
// Here it will correct the binding status just in case this happens last time.
return runtime.Result{}, r.checkAndUpdateStaleBindingsStatus(ctx, allBindings)
}
klog.V(2).InfoS("Picked the bindings to be updated",
"clusterResourcePlacement", crpName,
"numberOfToBeUpdatedBindings", len(toBeUpdatedBindings),
"numberOfStaleBindings", len(staleBoundBindings),
"numberOfUpToDateBindings", len(upToDateBoundBindings))
// StaleBindings is the list that contains bindings that need to be updated (binding to a
// cluster, upgrading to a newer resource/override snapshot) but are blocked by
// the rollout strategy.
//
// Note that Fleet does not consider unscheduled bindings as stale bindings, even if the
// status conditions on them have become stale (the work generator will handle them as an
// exception).
//
// TO-DO (chenyu1): evaluate how we could improve the flow to reduce coupling.
//
// Update the status first, so that if the rolling out (updateBindings func) fails in the
// middle, the controller will recompute the list so the rollout can move forward.
if err := r.updateStaleBindingsStatus(ctx, staleBoundBindings); err != nil {
return runtime.Result{}, err
}
klog.V(2).InfoS("Successfully updated status of the stale bindings", "clusterResourcePlacement", crpName, "numberOfStaleBindings", len(staleBoundBindings))
// upToDateBoundBindings contains all the ClusterResourceBindings that does not need to have
// their resource/override snapshots updated, but might need to have their status updated.
//
// Bindings might have up to date resource/override snapshots but stale status information when
// an apply strategy update has just been applied, or an error has occurred during the
// previous rollout process (specifically after the spec update but before the status update).
if err := r.refreshUpToDateBindingStatus(ctx, upToDateBoundBindings); err != nil {
return runtime.Result{}, err
}
klog.V(2).InfoS("Successfully updated status of the up-to-date bindings", "clusterResourcePlacement", crpName, "numberOfUpToDateBindings", len(upToDateBoundBindings))
// Update all the bindings in parallel according to the rollout plan.
// We need to requeue the request regardless if the binding updates succeed or not
// to avoid the case that the rollout process stalling because the time based binding readiness does not trigger any event.
// Wait the time we need to wait for the first applied but not ready binding to be ready
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, r.updateBindings(ctx, toBeUpdatedBindings)
}