in pkg/controllers/workapplier/preprocess.go [111:233]
func (r *Reconciler) writeAheadManifestProcessingAttempts(
ctx context.Context,
bundles []*manifestProcessingBundle,
work *fleetv1beta1.Work,
expectedAppliedWorkOwnerRef *metav1.OwnerReference,
) error {
workRef := klog.KObj(work)
// As a shortcut, if there's no spec change in the Work object and the status indicates that
// a previous apply attempt has been recorded (**successful or not**), Fleet will skip the write-ahead
// op.
workAppliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
if workAppliedCond != nil && workAppliedCond.ObservedGeneration == work.Generation {
klog.V(2).InfoS("Attempt to apply the current set of manifests has been made before and the results have been recorded; will skip the write-ahead process", "work", workRef)
return nil
}
// As another shortcut, if the Work object has an apply strategy that has the ReportDiff
// mode on, Fleet will skip the write-ahead op.
//
// Note that in this scenario Fleet will not attempt to remove any left over manifests;
// such manifests will only get cleaned up when the ReportDiff mode is turned off, or the
// CRP itself is deleted.
if work.Spec.ApplyStrategy != nil && work.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff {
klog.V(2).InfoS("The apply strategy is set to report diff; will skip the write-ahead process", "work", workRef)
return nil
}
// Prepare the status update (the new manifest conditions) for the write-ahead process.
//
// Note that even though we pre-allocate the slice, the length is set to 0. This is to
// accommodate the case where there might manifests that have failed pre-processing;
// such manifests will not be included in this round's status update.
manifestCondsForWA := make([]fleetv1beta1.ManifestCondition, 0, len(bundles))
// Prepare an query index of existing manifest conditions on the Work object for quicker
// lookups.
existingManifestCondQIdx := prepareExistingManifestCondQIdx(work.Status.ManifestConditions)
// For each manifest, verify if it has been tracked in the newly prepared manifest conditions.
// This helps signal duplicated resources in the Work object.
checked := make(map[string]bool, len(bundles))
for idx := range bundles {
bundle := bundles[idx]
if bundle.applyErr != nil {
// Skip a manifest if it cannot be pre-processed, i.e., it can only be identified by
// its ordinal.
//
// Such manifests would still be reported in the status (see the later parts of the
// reconciliation loop), it is just that they are not relevant in the write-ahead
// process.
klog.V(2).InfoS("Skipped a manifest in the write-ahead process as it has failed pre-processing", "work", workRef,
"ordinal", idx, "applyErr", bundle.applyErr, "applyResTyp", bundle.applyResTyp)
continue
}
// Register the manifest in the checked map; if another manifest with the same identifier
// has been checked before, Fleet would mark the current manifest as a duplicate and skip
// it. This is to address a corner case where users might have specified the same manifest
// twice in resource envelopes; duplication will not occur if the manifests are directly
// created in the hub cluster.
//
// A side note: Golang does support using structs as map keys; preparing the string
// representations of structs as keys can help performance, though not by much. The reason
// why string representations are used here is not for performance, though; instead, it
// is to address the issue that for this comparison, ordinals should be ignored.
wriStr, err := formatWRIString(bundle.id)
if err != nil {
// Normally this branch will never run as all manifests that cannot be decoded has been
// skipped in the check above. Here Fleet simply skips the manifest.
klog.ErrorS(err, "Failed to format the work resource identifier string",
"ordinal", idx, "work", workRef)
_ = controller.NewUnexpectedBehaviorError(err)
continue
}
if _, found := checked[wriStr]; found {
klog.V(2).InfoS("A duplicate manifest has been found",
"ordinal", idx, "work", workRef, "workResourceID", wriStr)
bundle.applyErr = fmt.Errorf("a duplicate manifest has been found")
bundle.applyResTyp = ManifestProcessingApplyResultTypeDuplicated
continue
}
checked[wriStr] = true
// Prepare the manifest conditions for the write-ahead process.
manifestCondForWA := prepareManifestCondForWriteAhead(wriStr, bundle.id, work.Generation, existingManifestCondQIdx, work.Status.ManifestConditions)
manifestCondsForWA = append(manifestCondsForWA, manifestCondForWA)
klog.V(2).InfoS("Prepared write-ahead information for a manifest",
"manifestObj", klog.KObj(bundle.manifestObj), "workResourceID", wriStr, "work", workRef)
}
// Identify any manifests from previous runs that might have been applied and are now left
// over in the member cluster.
leftOverManifests := findLeftOverManifests(manifestCondsForWA, existingManifestCondQIdx, work.Status.ManifestConditions)
if err := r.removeLeftOverManifests(ctx, leftOverManifests, expectedAppliedWorkOwnerRef); err != nil {
klog.Errorf("Failed to remove left-over manifests (work=%+v, leftOverManifestCount=%d, removalFailureCount=%d)",
workRef, len(leftOverManifests), len(err.Errors()))
return fmt.Errorf("failed to remove left-over manifests: %w", err)
}
klog.V(2).InfoS("Left-over manifests are found and removed",
"leftOverManifestCount", len(leftOverManifests), "work", workRef)
// Update the status.
//
// Note that the Work object might have been refreshed by controllers on the hub cluster
// before this step runs; in this case the current reconciliation loop must be abandoned.
if work.Status.Conditions == nil {
// As a sanity check, set an empty set of conditions. Currently the API definition does
// not allow nil conditions.
work.Status.Conditions = []metav1.Condition{}
}
work.Status.ManifestConditions = manifestCondsForWA
if err := r.hubClient.Status().Update(ctx, work); err != nil {
klog.ErrorS(err, "Failed to write ahead manifest processing attempts", "work", workRef)
return controller.NewAPIServerError(false, fmt.Errorf("failed to write ahead manifest processing attempts: %w", err))
}
klog.V(2).InfoS("Write-ahead process completed", "work", workRef)
// Set the defaults again as the result yielded by the status update might have changed the object.
defaulter.SetDefaultsWork(work)
return nil
}