func()

in pkg/controllers/workapplier/preprocess.go [111:233]


func (r *Reconciler) writeAheadManifestProcessingAttempts(
	ctx context.Context,
	bundles []*manifestProcessingBundle,
	work *fleetv1beta1.Work,
	expectedAppliedWorkOwnerRef *metav1.OwnerReference,
) error {
	workRef := klog.KObj(work)

	// As a shortcut, if there's no spec change in the Work object and the status indicates that
	// a previous apply attempt has been recorded (**successful or not**), Fleet will skip the write-ahead
	// op.
	workAppliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
	if workAppliedCond != nil && workAppliedCond.ObservedGeneration == work.Generation {
		klog.V(2).InfoS("Attempt to apply the current set of manifests has been made before and the results have been recorded; will skip the write-ahead process", "work", workRef)
		return nil
	}

	// As another shortcut, if the Work object has an apply strategy that has the ReportDiff
	// mode on, Fleet will skip the write-ahead op.
	//
	// Note that in this scenario Fleet will not attempt to remove any left over manifests;
	// such manifests will only get cleaned up when the ReportDiff mode is turned off, or the
	// CRP itself is deleted.
	if work.Spec.ApplyStrategy != nil && work.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff {
		klog.V(2).InfoS("The apply strategy is set to report diff; will skip the write-ahead process", "work", workRef)
		return nil
	}

	// Prepare the status update (the new manifest conditions) for the write-ahead process.
	//
	// Note that even though we pre-allocate the slice, the length is set to 0. This is to
	// accommodate the case where there might manifests that have failed pre-processing;
	// such manifests will not be included in this round's status update.
	manifestCondsForWA := make([]fleetv1beta1.ManifestCondition, 0, len(bundles))

	// Prepare an query index of existing manifest conditions on the Work object for quicker
	// lookups.
	existingManifestCondQIdx := prepareExistingManifestCondQIdx(work.Status.ManifestConditions)

	// For each manifest, verify if it has been tracked in the newly prepared manifest conditions.
	// This helps signal duplicated resources in the Work object.
	checked := make(map[string]bool, len(bundles))
	for idx := range bundles {
		bundle := bundles[idx]
		if bundle.applyErr != nil {
			// Skip a manifest if it cannot be pre-processed, i.e., it can only be identified by
			// its ordinal.
			//
			// Such manifests would still be reported in the status (see the later parts of the
			// reconciliation loop), it is just that they are not relevant in the write-ahead
			// process.
			klog.V(2).InfoS("Skipped a manifest in the write-ahead process as it has failed pre-processing", "work", workRef,
				"ordinal", idx, "applyErr", bundle.applyErr, "applyResTyp", bundle.applyResTyp)
			continue
		}

		// Register the manifest in the checked map; if another manifest with the same identifier
		// has been checked before, Fleet would mark the current manifest as a duplicate and skip
		// it. This is to address a corner case where users might have specified the same manifest
		// twice in resource envelopes; duplication will not occur if the manifests are directly
		// created in the hub cluster.
		//
		// A side note: Golang does support using structs as map keys; preparing the string
		// representations of structs as keys can help performance, though not by much. The reason
		// why string representations are used here is not for performance, though; instead, it
		// is to address the issue that for this comparison, ordinals should be ignored.
		wriStr, err := formatWRIString(bundle.id)
		if err != nil {
			// Normally this branch will never run as all manifests that cannot be decoded has been
			// skipped in the check above. Here Fleet simply skips the manifest.
			klog.ErrorS(err, "Failed to format the work resource identifier string",
				"ordinal", idx, "work", workRef)
			_ = controller.NewUnexpectedBehaviorError(err)
			continue
		}
		if _, found := checked[wriStr]; found {
			klog.V(2).InfoS("A duplicate manifest has been found",
				"ordinal", idx, "work", workRef, "workResourceID", wriStr)
			bundle.applyErr = fmt.Errorf("a duplicate manifest has been found")
			bundle.applyResTyp = ManifestProcessingApplyResultTypeDuplicated
			continue
		}
		checked[wriStr] = true

		// Prepare the manifest conditions for the write-ahead process.
		manifestCondForWA := prepareManifestCondForWriteAhead(wriStr, bundle.id, work.Generation, existingManifestCondQIdx, work.Status.ManifestConditions)
		manifestCondsForWA = append(manifestCondsForWA, manifestCondForWA)

		klog.V(2).InfoS("Prepared write-ahead information for a manifest",
			"manifestObj", klog.KObj(bundle.manifestObj), "workResourceID", wriStr, "work", workRef)
	}

	// Identify any manifests from previous runs that might have been applied and are now left
	// over in the member cluster.
	leftOverManifests := findLeftOverManifests(manifestCondsForWA, existingManifestCondQIdx, work.Status.ManifestConditions)
	if err := r.removeLeftOverManifests(ctx, leftOverManifests, expectedAppliedWorkOwnerRef); err != nil {
		klog.Errorf("Failed to remove left-over manifests (work=%+v, leftOverManifestCount=%d, removalFailureCount=%d)",
			workRef, len(leftOverManifests), len(err.Errors()))
		return fmt.Errorf("failed to remove left-over manifests: %w", err)
	}
	klog.V(2).InfoS("Left-over manifests are found and removed",
		"leftOverManifestCount", len(leftOverManifests), "work", workRef)

	// Update the status.
	//
	// Note that the Work object might have been refreshed by controllers on the hub cluster
	// before this step runs; in this case the current reconciliation loop must be abandoned.
	if work.Status.Conditions == nil {
		// As a sanity check, set an empty set of conditions. Currently the API definition does
		// not allow nil conditions.
		work.Status.Conditions = []metav1.Condition{}
	}
	work.Status.ManifestConditions = manifestCondsForWA
	if err := r.hubClient.Status().Update(ctx, work); err != nil {
		klog.ErrorS(err, "Failed to write ahead manifest processing attempts", "work", workRef)
		return controller.NewAPIServerError(false, fmt.Errorf("failed to write ahead manifest processing attempts: %w", err))
	}
	klog.V(2).InfoS("Write-ahead process completed", "work", workRef)

	// Set the defaults again as the result yielded by the status update might have changed the object.
	defaulter.SetDefaultsWork(work)
	return nil
}