in pkg/controllers/workgenerator/controller.go [412:572]
func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding, existingWorks map[string]*fleetv1beta1.Work, cluster *clusterv1beta1.MemberCluster) (bool, bool, error) {
updateAny := atomic.NewBool(false)
resourceBindingRef := klog.KObj(resourceBinding)
// Refresh the apply strategy for all existing works.
//
// This step is performed separately from other refreshes as apply strategy changes are
// CRP-scoped and independent from the resource snapshot management mechanism. In other
// words, even if a work has become stranded (i.e., it is linked to a resource snapshot that
// is no longer present in the system), it should still be able to receive the latest apply
// strategy update.
errs, cctx := errgroup.WithContext(ctx)
for workName := range existingWorks {
w := existingWorks[workName]
errs.Go(func() error {
updated, err := r.syncApplyStrategy(ctx, resourceBinding, w)
if err != nil {
return err
}
if updated {
updateAny.Store(true)
}
return nil
})
}
if updateErr := errs.Wait(); updateErr != nil {
return false, false, updateErr
}
// the hash256 function can handle empty list https://go.dev/play/p/_4HW17fooXM
resourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
clusterResourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ClusterResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
// TODO: check all work synced first before fetching the snapshots after we put ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation in all the work objects
// Gather all the resource resourceSnapshots
resourceSnapshots, err := r.fetchAllResourceSnapshots(ctx, resourceBinding)
if err != nil {
if errors.Is(err, errResourceSnapshotNotFound) {
// the resourceIndex is deleted but the works might still be up to date with the binding.
if areAllWorkSynced(existingWorks, resourceBinding, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash) {
klog.V(2).InfoS("All the works are synced with the resourceBinding even if the resource snapshot index is removed", "resourceBinding", resourceBindingRef)
return true, updateAny.Load(), nil
}
return false, false, controller.NewUserError(err)
}
// TODO(RZ): handle errResourceNotFullyCreated error so we don't need to wait for all the snapshots to be created
return false, false, err
}
croMap, err := r.fetchClusterResourceOverrideSnapshots(ctx, resourceBinding)
if err != nil {
return false, false, err
}
roMap, err := r.fetchResourceOverrideSnapshots(ctx, resourceBinding)
if err != nil {
return false, false, err
}
// issue all the create/update requests for the corresponding works for each snapshot in parallel
activeWork := make(map[string]*fleetv1beta1.Work, len(resourceSnapshots))
errs, cctx = errgroup.WithContext(ctx)
// generate work objects for each resource snapshot
for i := range resourceSnapshots {
snapshot := resourceSnapshots[i]
var newWork []*fleetv1beta1.Work
workNamePrefix, err := getWorkNamePrefixFromSnapshotName(snapshot)
if err != nil {
klog.ErrorS(err, "Encountered a mal-formatted resource snapshot", "resourceSnapshot", klog.KObj(snapshot))
return false, false, err
}
var simpleManifests []fleetv1beta1.Manifest
for j := range snapshot.Spec.SelectedResources {
selectedResource := snapshot.Spec.SelectedResources[j].DeepCopy()
// TODO: override the content of the wrapped resource instead of the envelope itself
resourceDeleted, overrideErr := r.applyOverrides(selectedResource, cluster, croMap, roMap)
if overrideErr != nil {
return false, false, overrideErr
}
if resourceDeleted {
klog.V(2).InfoS("The resource is deleted by the override rules", "snapshot", klog.KObj(snapshot), "selectedResource", snapshot.Spec.SelectedResources[j])
continue
}
// we need to special treat configMap with envelopeConfigMapAnnotation annotation,
// so we need to check the GVK and annotation of the selected resource
var uResource unstructured.Unstructured
if unMarshallErr := uResource.UnmarshalJSON(selectedResource.Raw); unMarshallErr != nil {
klog.ErrorS(unMarshallErr, "work has invalid content", "snapshot", klog.KObj(snapshot), "selectedResource", selectedResource.Raw)
return true, false, controller.NewUnexpectedBehaviorError(unMarshallErr)
}
if uResource.GetObjectKind().GroupVersionKind() == utils.ConfigMapGVK &&
len(uResource.GetAnnotations()[fleetv1beta1.EnvelopeConfigMapAnnotation]) != 0 {
// get a work object for the enveloped configMap
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
if err != nil {
return true, false, err
}
activeWork[work.Name] = work
newWork = append(newWork, work)
} else {
simpleManifests = append(simpleManifests, fleetv1beta1.Manifest(*selectedResource))
}
}
if len(simpleManifests) == 0 {
klog.V(2).InfoS("the snapshot contains no resource to apply either because of override or enveloped resources", "snapshot", klog.KObj(snapshot))
}
// generate a work object for the manifests even if there is nothing to place
// to allow CRP to collect the status of the placement
// TODO (RZ): revisit to see if we need this hack
work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
activeWork[work.Name] = work
newWork = append(newWork, work)
// issue all the create/update requests for the corresponding works for each snapshot in parallel
for ni := range newWork {
w := newWork[ni]
errs.Go(func() error {
updated, err := r.upsertWork(cctx, w, existingWorks[w.Name].DeepCopy(), snapshot)
if err != nil {
return err
}
if updated {
updateAny.Store(true)
}
return nil
})
}
}
// delete the works that are not associated with any resource snapshot
for i := range existingWorks {
work := existingWorks[i]
if _, exist := activeWork[work.Name]; exist {
continue
}
errs.Go(func() error {
if err := r.Client.Delete(ctx, work); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete the no longer needed work", "work", klog.KObj(work))
return controller.NewAPIServerError(false, err)
}
}
klog.V(2).InfoS("Deleted the work that is not associated with any resource snapshot", "work", klog.KObj(work))
updateAny.Store(true)
return nil
})
}
// wait for all the create/update/delete requests to finish
if updateErr := errs.Wait(); updateErr != nil {
return true, false, updateErr
}
klog.V(2).InfoS("Successfully synced all the work associated with the resourceBinding", "updateAny", updateAny.Load(), "resourceBinding", resourceBindingRef)
return true, updateAny.Load(), nil
}