func()

in pkg/scheduler/framework/framework.go [807:987]


func (f *framework) runSchedulingCycleForPickNPlacementType(
	ctx context.Context,
	state *CycleState,
	crpName string,
	policy *placementv1beta1.ClusterSchedulingPolicySnapshot,
	clusters []clusterv1beta1.MemberCluster,
	bound, scheduled, unscheduled, obsolete []*placementv1beta1.ClusterResourceBinding,
) (result ctrl.Result, err error) {
	policyRef := klog.KObj(policy)

	// Retrieve the desired number of clusters from the policy.
	//
	// Note that for scheduling policies of the PickN type, this annotation is expected to be present.
	numOfClusters, err := annotations.ExtractNumOfClustersFromPolicySnapshot(policy)
	if err != nil {
		klog.ErrorS(err, "Failed to extract number of clusters required from policy snapshot", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
	}

	// Check if the scheduler should downscale, i.e., mark some scheduled/bound bindings as unscheduled and/or
	// clean up all obsolete bindings right away.
	//
	// Normally obsolete bindings are kept for cross-referencing at the end of the scheduling cycle to minimize
	// interruptions caused by scheduling policy change; however, in the case of downscaling, they can be removed
	// right away.
	//
	// To summarize, the scheduler will only downscale when
	//
	// * the scheduling policy is of the PickN type; and
	// * currently there are too many selected clusters, or more specifically too many scheduled/bound bindings
	//   in the system; or there are exactly the right number of selected clusters, but some obsolete bindings still linger
	//   in the system.
	if act, downscaleCount := shouldDownscale(policy, numOfClusters, len(scheduled)+len(bound), len(obsolete)); act {
		// Downscale if needed.
		//
		// To minimize interruptions, the scheduler picks scheduled bindings first, and then
		// bound bindings; when processing bound bindings, the logic prioritizes bindings that
		//
		//
		// This step will also mark all obsolete bindings (if any) as unscheduled right away.
		klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount)

		// Mark all obsolete bindings as unscheduled first.
		if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil {
			klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
			return ctrl.Result{}, err
		}

		// Perform actual downscaling; this will be skipped if the downscale count is zero.
		scheduled, bound, err = f.downscale(ctx, scheduled, bound, downscaleCount)
		if err != nil {
			klog.ErrorS(err, "failed to downscale", "clusterSchedulingPolicySnapshot", policyRef)
			return ctrl.Result{}, err
		}

		// Update the policy snapshot status with the latest scheduling decisions and condition.
		//
		// Note that since there is no reliable way to determine the validity of old decisions added
		// to the policy snapshot status, we will only update the status with the known facts, i.e.,
		// the clusters that are currently selected.
		if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, nil, nil, scheduled, bound); err != nil {
			klog.ErrorS(err, "Failed to update latest scheduling decisions and condition when downscaling", "clusterSchedulingPolicySnapshot", policyRef)
			return ctrl.Result{}, err
		}

		// Return immediately as there are no more bindings for the scheduler to scheduler at this moment.
		return ctrl.Result{}, nil
	}

	// Check if the scheduler needs to take action; a scheduling cycle is only needed if
	// currently there are not enough number of bindings.
	if !shouldSchedule(numOfClusters, len(bound)+len(scheduled)) {
		// No action is needed; however, a status refresh might be warranted.
		//
		// This is needed as a number of situations (e.g., POST/PUT failures) may lead to inconsistencies between
		// the decisions added to the policy snapshot status and the actual list of bindings.
		klog.V(2).InfoS("No scheduling is needed", "clusterSchedulingPolicySnapshot", policyRef)
		// Note that since there is no reliable way to determine the validity of old decisions added
		// to the policy snapshot status, we will only update the status with the known facts, i.e.,
		// the clusters that are currently selected.
		if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, nil, nil, bound, scheduled); err != nil {
			klog.ErrorS(err, "Failed to update latest scheduling decisions and condition when no scheduling run is needed", "clusterSchedulingPolicySnapshot", policyRef)
			return ctrl.Result{}, err
		}

		// Return immediate as there no more bindings for the scheduler to schedule at this moment.
		return ctrl.Result{}, nil
	}

	// The scheduler needs to take action; enter the actual scheduling stages.
	klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "clusterSchedulingPolicySnapshot", policyRef)

	// Run all the plugins.
	//
	// Note that it is up to some plugin (by default the same placement anti-affinity plugin)
	// to identify clusters that already have placements, in accordance with the latest
	// scheduling policy, on them. Such clusters will not be scored; it will not be included
	// as a filtered out cluster, either.
	scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, numOfClusters, len(bound)+len(scheduled), clusters)
	if err != nil {
		klog.ErrorS(err, "Failed to run all plugins", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, err
	}

	// Pick the top scored clusters.
	klog.V(2).InfoS("Picking clusters", "clusterSchedulingPolicySnapshot", policyRef)

	// Calculate the number of clusters to pick.
	numOfClustersToPick := calcNumOfClustersToSelect(state.desiredBatchSize, state.batchSizeLimit, len(scored))

	// Do a sanity check; normally this branch will never run, as earlier check
	// guarantees that the number of clusters to pick is always no greater than number of
	// scored clusters.
	if numOfClustersToPick > len(scored) {
		err := fmt.Errorf("number of clusters to pick is greater than number of scored clusters: %d > %d", numOfClustersToPick, len(scored))
		klog.ErrorS(err, "Failed to calculate number of clusters to pick", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
	}

	// Pick the clusters.
	//
	// Note that at this point of the scheduling cycle, any cluster associated with a currently
	// bound or scheduled binding should be filtered out already.
	picked, notPicked := pickTopNScoredClusters(scored, numOfClustersToPick)

	// Cross-reference the newly picked clusters with obsolete bindings; find out
	//
	// * bindings that should be created, i.e., create a binding for every cluster that is newly picked
	//   and does not have a binding associated with;
	// * bindings that should be patched, i.e., associate a binding whose target cluster is picked again
	//   in the current run with the latest score and the latest scheduling policy snapshot;
	// * bindings that should be deleted, i.e., mark a binding as unschedulable if its target cluster is no
	//   longer picked in the current run.
	//
	// Fields in the returned bindings are fulfilled and/or refreshed as applicable.
	klog.V(2).InfoS("Cross-referencing bindings with picked clusters", "clusterSchedulingPolicySnapshot", policyRef, "numOfClustersToPick", numOfClustersToPick)
	toCreate, toDelete, toPatch, err := crossReferencePickedClustersAndDeDupBindings(crpName, policy, picked, unscheduled, obsolete)
	if err != nil {
		klog.ErrorS(err, "Failed to cross-reference bindings with picked clusters", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, err
	}

	// Manipulate bindings accordingly.
	klog.V(2).InfoS("Manipulating bindings", "clusterSchedulingPolicySnapshot", policyRef)
	if err := f.manipulateBindings(ctx, policy, toCreate, toDelete, toPatch); err != nil {
		klog.ErrorS(err, "Failed to manipulate bindings", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, err
	}

	// Requeue if needed.
	//
	// The scheduler will requeue to pick more clusters for the current policy snapshot if and only if
	// * one or more plugins have imposed a valid batch size limit; and
	// * the scheduler has found enough clusters for the current policy snapshot per this batch size limit.
	//
	// Note that the scheduler workflow at this point guarantees that the desired batch size is no less
	// than the batch size limit, and that the number of picked clusters is no more than the batch size
	// limit.
	//
	// Also note that if a requeue is needed, the scheduling decisions and condition are updated only
	// when there are no more clusters to pick.
	if shouldRequeue(state.desiredBatchSize, state.batchSizeLimit, len(toCreate)+len(toPatch)) {
		return ctrl.Result{Requeue: true}, nil
	}

	// Extract the patched bindings.
	patched := make([]*placementv1beta1.ClusterResourceBinding, 0, len(toPatch))
	for _, p := range toPatch {
		patched = append(patched, p.updated)
	}

	// Update policy snapshot status with the latest scheduling decisions and condition.
	klog.V(2).InfoS("Updating policy snapshot status", "clusterSchedulingPolicySnapshot", policyRef)
	if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, notPicked, filtered, toCreate, patched, scheduled, bound); err != nil {
		klog.ErrorS(err, "Failed to update latest scheduling decisions and condition", "clusterSchedulingPolicySnapshot", policyRef)
		return ctrl.Result{}, err
	}

	// The scheduling cycle has completed.
	return ctrl.Result{}, nil
}