func()

in pkg/controllers/workgenerator/controller.go [412:572]


func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding, existingWorks map[string]*fleetv1beta1.Work, cluster *clusterv1beta1.MemberCluster) (bool, bool, error) {
	updateAny := atomic.NewBool(false)
	resourceBindingRef := klog.KObj(resourceBinding)

	// Refresh the apply strategy for all existing works.
	//
	// This step is performed separately from other refreshes as apply strategy changes are
	// CRP-scoped and independent from the resource snapshot management mechanism. In other
	// words, even if a work has become stranded (i.e., it is linked to a resource snapshot that
	// is no longer present in the system), it should still be able to receive the latest apply
	// strategy update.
	errs, cctx := errgroup.WithContext(ctx)
	for workName := range existingWorks {
		w := existingWorks[workName]
		errs.Go(func() error {
			updated, err := r.syncApplyStrategy(ctx, resourceBinding, w)
			if err != nil {
				return err
			}
			if updated {
				updateAny.Store(true)
			}
			return nil
		})
	}
	if updateErr := errs.Wait(); updateErr != nil {
		return false, false, updateErr
	}

	// the hash256 function can handle empty list https://go.dev/play/p/_4HW17fooXM
	resourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ResourceOverrideSnapshots)
	if err != nil {
		return false, false, controller.NewUnexpectedBehaviorError(err)
	}
	clusterResourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ClusterResourceOverrideSnapshots)
	if err != nil {
		return false, false, controller.NewUnexpectedBehaviorError(err)
	}
	// TODO: check all work synced first before fetching the snapshots after we put ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation in all the work objects

	// Gather all the resource resourceSnapshots
	resourceSnapshots, err := r.fetchAllResourceSnapshots(ctx, resourceBinding)
	if err != nil {
		if errors.Is(err, errResourceSnapshotNotFound) {
			// the resourceIndex is deleted but the works might still be up to date with the binding.
			if areAllWorkSynced(existingWorks, resourceBinding, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash) {
				klog.V(2).InfoS("All the works are synced with the resourceBinding even if the resource snapshot index is removed", "resourceBinding", resourceBindingRef)
				return true, updateAny.Load(), nil
			}
			return false, false, controller.NewUserError(err)
		}
		// TODO(RZ): handle errResourceNotFullyCreated error so we don't need to wait for all the snapshots to be created
		return false, false, err
	}

	croMap, err := r.fetchClusterResourceOverrideSnapshots(ctx, resourceBinding)
	if err != nil {
		return false, false, err
	}

	roMap, err := r.fetchResourceOverrideSnapshots(ctx, resourceBinding)
	if err != nil {
		return false, false, err
	}

	// issue all the create/update requests for the corresponding works for each snapshot in parallel
	activeWork := make(map[string]*fleetv1beta1.Work, len(resourceSnapshots))
	errs, cctx = errgroup.WithContext(ctx)
	// generate work objects for each resource snapshot
	for i := range resourceSnapshots {
		snapshot := resourceSnapshots[i]
		var newWork []*fleetv1beta1.Work
		workNamePrefix, err := getWorkNamePrefixFromSnapshotName(snapshot)
		if err != nil {
			klog.ErrorS(err, "Encountered a mal-formatted resource snapshot", "resourceSnapshot", klog.KObj(snapshot))
			return false, false, err
		}
		var simpleManifests []fleetv1beta1.Manifest
		for j := range snapshot.Spec.SelectedResources {
			selectedResource := snapshot.Spec.SelectedResources[j].DeepCopy()
			// TODO: override the content of the wrapped resource instead of the envelope itself
			resourceDeleted, overrideErr := r.applyOverrides(selectedResource, cluster, croMap, roMap)
			if overrideErr != nil {
				return false, false, overrideErr
			}
			if resourceDeleted {
				klog.V(2).InfoS("The resource is deleted by the override rules", "snapshot", klog.KObj(snapshot), "selectedResource", snapshot.Spec.SelectedResources[j])
				continue
			}
			// we need to special treat configMap with envelopeConfigMapAnnotation annotation,
			// so we need to check the GVK and annotation of the selected resource
			var uResource unstructured.Unstructured
			if unMarshallErr := uResource.UnmarshalJSON(selectedResource.Raw); unMarshallErr != nil {
				klog.ErrorS(unMarshallErr, "work has invalid content", "snapshot", klog.KObj(snapshot), "selectedResource", selectedResource.Raw)
				return true, false, controller.NewUnexpectedBehaviorError(unMarshallErr)
			}
			if uResource.GetObjectKind().GroupVersionKind() == utils.ConfigMapGVK &&
				len(uResource.GetAnnotations()[fleetv1beta1.EnvelopeConfigMapAnnotation]) != 0 {
				// get a work object for the enveloped configMap
				work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
				if err != nil {
					return true, false, err
				}
				activeWork[work.Name] = work
				newWork = append(newWork, work)
			} else {
				simpleManifests = append(simpleManifests, fleetv1beta1.Manifest(*selectedResource))
			}
		}
		if len(simpleManifests) == 0 {
			klog.V(2).InfoS("the snapshot contains no resource to apply either because of override or enveloped resources", "snapshot", klog.KObj(snapshot))
		}
		// generate a work object for the manifests even if there is nothing to place
		// to allow CRP to collect the status of the placement
		// TODO (RZ): revisit to see if we need this hack
		work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
		activeWork[work.Name] = work
		newWork = append(newWork, work)

		// issue all the create/update requests for the corresponding works for each snapshot in parallel
		for ni := range newWork {
			w := newWork[ni]
			errs.Go(func() error {
				updated, err := r.upsertWork(cctx, w, existingWorks[w.Name].DeepCopy(), snapshot)
				if err != nil {
					return err
				}
				if updated {
					updateAny.Store(true)
				}
				return nil
			})
		}
	}

	//  delete the works that are not associated with any resource snapshot
	for i := range existingWorks {
		work := existingWorks[i]
		if _, exist := activeWork[work.Name]; exist {
			continue
		}
		errs.Go(func() error {
			if err := r.Client.Delete(ctx, work); err != nil {
				if !apierrors.IsNotFound(err) {
					klog.ErrorS(err, "Failed to delete the no longer needed work", "work", klog.KObj(work))
					return controller.NewAPIServerError(false, err)
				}
			}
			klog.V(2).InfoS("Deleted the work that is not associated with any resource snapshot", "work", klog.KObj(work))
			updateAny.Store(true)
			return nil
		})
	}

	// wait for all the create/update/delete requests to finish
	if updateErr := errs.Wait(); updateErr != nil {
		return true, false, updateErr
	}
	klog.V(2).InfoS("Successfully synced all the work associated with the resourceBinding", "updateAny", updateAny.Load(), "resourceBinding", resourceBindingRef)
	return true, updateAny.Load(), nil
}