func()

in pkg/controllers/clusterresourceplacement/work_propagation.go [47:134]


func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement,
	manifests []workv1alpha1.Manifest) error {
	var allErr []error
	memberClusterNames := placement.Status.TargetClusters
	workName := placement.Name
	workerOwnerRef := metav1.OwnerReference{
		APIVersion:         placement.GroupVersionKind().GroupVersion().String(),
		Kind:               placement.GroupVersionKind().Kind,
		Name:               placement.GetName(),
		UID:                placement.GetUID(),
		BlockOwnerDeletion: ptr.To(true),
		Controller:         ptr.To(true),
	}
	workerSpec := workv1alpha1.WorkSpec{
		Workload: workv1alpha1.WorkloadTemplate{
			Manifests: manifests,
		},
	}
	specHash, err := resource.HashOf(workerSpec.Workload)
	if err != nil {
		return fmt.Errorf("failed to calculate the spec hash of the newly generated work resource: %w", err)
	}
	workLabels := map[string]string{
		utils.LabelWorkPlacementName: placement.GetName(),
		utils.LabelFleetObj:          utils.LabelFleetObjValue,
	}
	workAnnotation := map[string]string{
		utils.LastWorkUpdateTimeAnnotationKey: time.Now().Format(time.RFC3339),
		specHashAnnotationKey:                 specHash,
	}
	changed := false
	for _, memberClusterName := range memberClusterNames {
		memberClusterNsName := fmt.Sprintf(utils.NamespaceNameFormat, memberClusterName)
		curWork, err := r.getResourceBinding(memberClusterNsName, workName)
		if err != nil {
			if !apierrors.IsNotFound(err) {
				allErr = append(allErr, fmt.Errorf("failed to get the work obj %s in namespace %s: %w", workName, memberClusterName, err))
				continue
			}
			// create the work CR since it doesn't exist
			workCR := &workv1alpha1.Work{
				ObjectMeta: metav1.ObjectMeta{
					Namespace: memberClusterNsName,
					Name:      workName,
					OwnerReferences: []metav1.OwnerReference{
						workerOwnerRef,
					},
					Labels:      workLabels,
					Annotations: workAnnotation,
				},
				Spec: workerSpec,
			}
			if createErr := r.Client.Create(ctx, workCR, client.FieldOwner(utils.PlacementFieldManagerName)); createErr != nil {
				klog.ErrorS(createErr, "failed to create the work", "work", workName, "namespace", memberClusterNsName)
				allErr = append(allErr, fmt.Errorf("failed to create the work obj %s in namespace %s: %w", workName, memberClusterNsName, createErr))
				continue
			}
			klog.V(2).InfoS("created work spec with manifests",
				"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
			changed = true
			continue
		}
		existingHash := curWork.GetAnnotations()[specHashAnnotationKey]
		if existingHash == specHash || reflect.DeepEqual(curWork.Spec.Workload.Manifests, workerSpec.Workload.Manifests) {
			klog.V(2).InfoS("skip updating work spec as its identical",
				"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
			continue
		}
		changed = true
		curWork.Spec = workerSpec
		curWork.SetLabels(workLabels)
		curWork.SetOwnerReferences([]metav1.OwnerReference{workerOwnerRef})
		curWork.SetAnnotations(workAnnotation)
		if updateErr := r.Client.Update(ctx, curWork, client.FieldOwner(utils.PlacementFieldManagerName)); updateErr != nil {
			allErr = append(allErr, fmt.Errorf("failed to update the work obj %s in namespace %s: %w", workName, memberClusterNsName, updateErr))
			continue
		}
		klog.V(2).InfoS("updated work spec with manifests",
			"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
	}
	if changed {
		klog.V(2).InfoS("Applied all work to the selected cluster namespaces", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames))
	} else {
		klog.V(2).InfoS("Nothing new to apply for the cluster resource placement", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames))
	}

	return errors.NewAggregate(allErr)
}