func()

in pkg/controllers/clusterresourceplacement/placement_controllerv1alpha1.go [75:195]


func (r *Reconciler) ReconcileV1Alpha1(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) {
	startTime := time.Now()
	name, ok := key.(string)
	if !ok {
		err := fmt.Errorf("get place key %+v not of type string", key)
		klog.ErrorS(err, "We have encountered a fatal error that can't be retried, requeue after a day")
		return ctrl.Result{RequeueAfter: time.Hour * 24}, nil
	}

	placementOld, err := r.getPlacement(name)
	if err != nil {
		klog.ErrorS(err, "Failed to get the cluster resource placement in hub", "placement", name)
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	placeRef := klog.KObj(placementOld)
	placementNew := placementOld.DeepCopy()
	// add latency log
	defer func() {
		klog.V(2).InfoS("ClusterResourcePlacement reconciliation loop ends", "placement", placeRef, "latency", time.Since(startTime).Milliseconds())
	}()

	// TODO: add finalizer logic if we need it in the future
	klog.V(2).InfoS("Start to reconcile a ClusterResourcePlacement", "placement", placeRef)
	// select the new clusters and record that in the placementNew status
	selectedClusters, scheduleErr := r.selectClusters(placementNew)
	if scheduleErr != nil {
		klog.ErrorS(scheduleErr, "Failed to select the clusters", "placement", placeRef)
		r.updatePlacementScheduledCondition(placementOld, scheduleErr)
		_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
		// TODO: check on certain error (i.e. not cluster scoped) and do not retry
		return ctrl.Result{}, scheduleErr
	}
	if len(selectedClusters) == 0 {
		// no need to continue, we are not placing anything
		klog.V(2).InfoS("No clusters match the placement", "placement", placeRef)
		return r.removeAllWorks(ctx, placementOld)
	}

	klog.V(2).InfoS("Successfully selected clusters", "placement", placementOld.Name, "number of clusters", len(selectedClusters))

	// select the new resources and record the result in the placementNew status
	manifests, scheduleErr := r.selectResources(placementNew)
	if scheduleErr != nil {
		klog.ErrorS(scheduleErr, "failed to select the resources for this placement", "placement", placeRef)
		r.updatePlacementScheduledCondition(placementOld, scheduleErr)
		_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
		return ctrl.Result{}, scheduleErr
	}
	if len(manifests) == 0 {
		// no need to continue, we are not placing anything
		klog.V(2).InfoS("No resources match the placement", "placement", placeRef)
		return r.removeAllWorks(ctx, placementOld)
	}
	klog.V(2).InfoS("Successfully selected resources", "placement", placementOld.Name, "number of resources", len(manifests))

	// persist union of the all the selected resources and clusters between placementNew and placementOld so that we won't
	// get orphaned resource/cluster if the reconcile loops stops between work creation and the placement status persisted
	totalCluster, totalResources, scheduleErr := r.persistSelectedResourceUnion(ctx, placementOld, placementNew)
	if scheduleErr != nil {
		klog.ErrorS(scheduleErr, "failed to record the  work resources ", "placement", placeRef)
		r.updatePlacementScheduledCondition(placementOld, scheduleErr)
		_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
		return ctrl.Result{}, scheduleErr
	}
	klog.V(2).InfoS("Successfully persisted the intermediate scheduling result", "placement", placementOld.Name,
		"totalClusters", totalCluster, "totalResources", totalResources)
	// pick up the newly updated schedule condition so that the last schedule time will change every time we run the reconcile loop
	meta.SetStatusCondition(&placementNew.Status.Conditions, *placementOld.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled)))
	// pick up the new version so that we can update placementNew without getting it again
	placementNew.SetResourceVersion(placementOld.GetResourceVersion())

	// schedule works for each cluster by placing them in the cluster scoped namespace
	scheduleErr = r.scheduleWork(ctx, placementNew, manifests)
	if scheduleErr != nil {
		klog.ErrorS(scheduleErr, "failed to apply work resources ", "placement", placeRef)
		r.updatePlacementScheduledCondition(placementOld, scheduleErr)
		_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
		return ctrl.Result{}, scheduleErr
	}
	klog.V(2).InfoS("Successfully scheduled work resources", "placement", placementOld.Name, "number of clusters", len(selectedClusters))

	// go through the existing cluster list and remove work from no longer scheduled clusters.
	removed, scheduleErr := r.removeStaleWorks(ctx, placementNew.GetName(), placementOld.Status.TargetClusters, placementNew.Status.TargetClusters)
	if scheduleErr != nil {
		//  if we fail here, the newly selected cluster's work are not removed if they are not picked by the next reconcile loop
		//  as they are not recorded in the old placement status.
		// TODO: add them to the old placement selected clusters since the work has been created although the update can still fail
		klog.ErrorS(scheduleErr, "failed to remove work resources from previously selected clusters", "placement", placeRef)
		r.updatePlacementScheduledCondition(placementOld, scheduleErr)
		_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
		return ctrl.Result{}, scheduleErr
	}
	klog.V(2).InfoS("Successfully removed work resources from previously selected clusters", "placement", placementOld.Name, "removed clusters", removed)

	// the schedule has succeeded, so we now can use the placementNew status that contains all the newly selected cluster and resources
	r.updatePlacementScheduledCondition(placementNew, nil)

	// go through all the valid works, get the failed and pending manifests
	hasPending, applyErr := r.collectAllManifestsStatus(placementNew)
	if applyErr != nil {
		klog.ErrorS(applyErr, "failed to collect work resources status from all selected clusters", "placement", placeRef)
		r.updatePlacementAppliedCondition(placementNew, applyErr)
		_ = r.Client.Status().Update(ctx, placementNew, client.FieldOwner(utils.PlacementFieldManagerName))
		return ctrl.Result{}, applyErr
	}
	klog.V(2).InfoS("Successfully collected work resources status from all selected clusters",
		"placement", placementOld.Name, "number of clusters", len(selectedClusters), "hasPending", hasPending,
		"numberFailedPlacement", len(placementNew.Status.FailedResourcePlacements))

	if !hasPending && len(placementNew.Status.FailedResourcePlacements) == 0 {
		r.updatePlacementAppliedCondition(placementNew, nil)
	} else if len(placementNew.Status.FailedResourcePlacements) == 0 {
		r.updatePlacementAppliedCondition(placementNew, ErrStillPendingManifest)
	} else {
		r.updatePlacementAppliedCondition(placementNew, ErrFailedManifest)
	}

	// we keep a slow reconcile loop here as a backup.
	// Any update on the work will trigger a new reconcile immediately
	return ctrl.Result{RequeueAfter: 5 * time.Minute}, r.Client.Status().Update(ctx, placementNew, client.FieldOwner(utils.PlacementFieldManagerName))
}