in pkg/controllers/clusterresourceplacement/work_propagation.go [47:134]
func (r *Reconciler) scheduleWork(ctx context.Context, placement *fleetv1alpha1.ClusterResourcePlacement,
manifests []workv1alpha1.Manifest) error {
var allErr []error
memberClusterNames := placement.Status.TargetClusters
workName := placement.Name
workerOwnerRef := metav1.OwnerReference{
APIVersion: placement.GroupVersionKind().GroupVersion().String(),
Kind: placement.GroupVersionKind().Kind,
Name: placement.GetName(),
UID: placement.GetUID(),
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}
workerSpec := workv1alpha1.WorkSpec{
Workload: workv1alpha1.WorkloadTemplate{
Manifests: manifests,
},
}
specHash, err := resource.HashOf(workerSpec.Workload)
if err != nil {
return fmt.Errorf("failed to calculate the spec hash of the newly generated work resource: %w", err)
}
workLabels := map[string]string{
utils.LabelWorkPlacementName: placement.GetName(),
utils.LabelFleetObj: utils.LabelFleetObjValue,
}
workAnnotation := map[string]string{
utils.LastWorkUpdateTimeAnnotationKey: time.Now().Format(time.RFC3339),
specHashAnnotationKey: specHash,
}
changed := false
for _, memberClusterName := range memberClusterNames {
memberClusterNsName := fmt.Sprintf(utils.NamespaceNameFormat, memberClusterName)
curWork, err := r.getResourceBinding(memberClusterNsName, workName)
if err != nil {
if !apierrors.IsNotFound(err) {
allErr = append(allErr, fmt.Errorf("failed to get the work obj %s in namespace %s: %w", workName, memberClusterName, err))
continue
}
// create the work CR since it doesn't exist
workCR := &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Namespace: memberClusterNsName,
Name: workName,
OwnerReferences: []metav1.OwnerReference{
workerOwnerRef,
},
Labels: workLabels,
Annotations: workAnnotation,
},
Spec: workerSpec,
}
if createErr := r.Client.Create(ctx, workCR, client.FieldOwner(utils.PlacementFieldManagerName)); createErr != nil {
klog.ErrorS(createErr, "failed to create the work", "work", workName, "namespace", memberClusterNsName)
allErr = append(allErr, fmt.Errorf("failed to create the work obj %s in namespace %s: %w", workName, memberClusterNsName, createErr))
continue
}
klog.V(2).InfoS("created work spec with manifests",
"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
changed = true
continue
}
existingHash := curWork.GetAnnotations()[specHashAnnotationKey]
if existingHash == specHash || reflect.DeepEqual(curWork.Spec.Workload.Manifests, workerSpec.Workload.Manifests) {
klog.V(2).InfoS("skip updating work spec as its identical",
"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
continue
}
changed = true
curWork.Spec = workerSpec
curWork.SetLabels(workLabels)
curWork.SetOwnerReferences([]metav1.OwnerReference{workerOwnerRef})
curWork.SetAnnotations(workAnnotation)
if updateErr := r.Client.Update(ctx, curWork, client.FieldOwner(utils.PlacementFieldManagerName)); updateErr != nil {
allErr = append(allErr, fmt.Errorf("failed to update the work obj %s in namespace %s: %w", workName, memberClusterNsName, updateErr))
continue
}
klog.V(2).InfoS("updated work spec with manifests",
"member cluster namespace", memberClusterNsName, "work name", workName, "number of manifests", len(manifests))
}
if changed {
klog.V(2).InfoS("Applied all work to the selected cluster namespaces", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames))
} else {
klog.V(2).InfoS("Nothing new to apply for the cluster resource placement", "placement", klog.KObj(placement), "number of clusters", len(memberClusterNames))
}
return errors.NewAggregate(allErr)
}