in pkg/scheduler/framework/framework.go [807:987]
func (f *framework) runSchedulingCycleForPickNPlacementType(
ctx context.Context,
state *CycleState,
crpName string,
policy *placementv1beta1.ClusterSchedulingPolicySnapshot,
clusters []clusterv1beta1.MemberCluster,
bound, scheduled, unscheduled, obsolete []*placementv1beta1.ClusterResourceBinding,
) (result ctrl.Result, err error) {
policyRef := klog.KObj(policy)
// Retrieve the desired number of clusters from the policy.
//
// Note that for scheduling policies of the PickN type, this annotation is expected to be present.
numOfClusters, err := annotations.ExtractNumOfClustersFromPolicySnapshot(policy)
if err != nil {
klog.ErrorS(err, "Failed to extract number of clusters required from policy snapshot", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}
// Check if the scheduler should downscale, i.e., mark some scheduled/bound bindings as unscheduled and/or
// clean up all obsolete bindings right away.
//
// Normally obsolete bindings are kept for cross-referencing at the end of the scheduling cycle to minimize
// interruptions caused by scheduling policy change; however, in the case of downscaling, they can be removed
// right away.
//
// To summarize, the scheduler will only downscale when
//
// * the scheduling policy is of the PickN type; and
// * currently there are too many selected clusters, or more specifically too many scheduled/bound bindings
// in the system; or there are exactly the right number of selected clusters, but some obsolete bindings still linger
// in the system.
if act, downscaleCount := shouldDownscale(policy, numOfClusters, len(scheduled)+len(bound), len(obsolete)); act {
// Downscale if needed.
//
// To minimize interruptions, the scheduler picks scheduled bindings first, and then
// bound bindings; when processing bound bindings, the logic prioritizes bindings that
//
//
// This step will also mark all obsolete bindings (if any) as unscheduled right away.
klog.V(2).InfoS("Downscaling is needed", "clusterSchedulingPolicySnapshot", policyRef, "downscaleCount", downscaleCount)
// Mark all obsolete bindings as unscheduled first.
if err := f.updateBindings(ctx, obsolete, markUnscheduledForAndUpdate); err != nil {
klog.ErrorS(err, "Failed to mark obsolete bindings as unscheduled", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Perform actual downscaling; this will be skipped if the downscale count is zero.
scheduled, bound, err = f.downscale(ctx, scheduled, bound, downscaleCount)
if err != nil {
klog.ErrorS(err, "failed to downscale", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Update the policy snapshot status with the latest scheduling decisions and condition.
//
// Note that since there is no reliable way to determine the validity of old decisions added
// to the policy snapshot status, we will only update the status with the known facts, i.e.,
// the clusters that are currently selected.
if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, nil, nil, scheduled, bound); err != nil {
klog.ErrorS(err, "Failed to update latest scheduling decisions and condition when downscaling", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Return immediately as there are no more bindings for the scheduler to scheduler at this moment.
return ctrl.Result{}, nil
}
// Check if the scheduler needs to take action; a scheduling cycle is only needed if
// currently there are not enough number of bindings.
if !shouldSchedule(numOfClusters, len(bound)+len(scheduled)) {
// No action is needed; however, a status refresh might be warranted.
//
// This is needed as a number of situations (e.g., POST/PUT failures) may lead to inconsistencies between
// the decisions added to the policy snapshot status and the actual list of bindings.
klog.V(2).InfoS("No scheduling is needed", "clusterSchedulingPolicySnapshot", policyRef)
// Note that since there is no reliable way to determine the validity of old decisions added
// to the policy snapshot status, we will only update the status with the known facts, i.e.,
// the clusters that are currently selected.
if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, nil, nil, bound, scheduled); err != nil {
klog.ErrorS(err, "Failed to update latest scheduling decisions and condition when no scheduling run is needed", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Return immediate as there no more bindings for the scheduler to schedule at this moment.
return ctrl.Result{}, nil
}
// The scheduler needs to take action; enter the actual scheduling stages.
klog.V(2).InfoS("Scheduling is needed; entering scheduling stages", "clusterSchedulingPolicySnapshot", policyRef)
// Run all the plugins.
//
// Note that it is up to some plugin (by default the same placement anti-affinity plugin)
// to identify clusters that already have placements, in accordance with the latest
// scheduling policy, on them. Such clusters will not be scored; it will not be included
// as a filtered out cluster, either.
scored, filtered, err := f.runAllPluginsForPickNPlacementType(ctx, state, policy, numOfClusters, len(bound)+len(scheduled), clusters)
if err != nil {
klog.ErrorS(err, "Failed to run all plugins", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Pick the top scored clusters.
klog.V(2).InfoS("Picking clusters", "clusterSchedulingPolicySnapshot", policyRef)
// Calculate the number of clusters to pick.
numOfClustersToPick := calcNumOfClustersToSelect(state.desiredBatchSize, state.batchSizeLimit, len(scored))
// Do a sanity check; normally this branch will never run, as earlier check
// guarantees that the number of clusters to pick is always no greater than number of
// scored clusters.
if numOfClustersToPick > len(scored) {
err := fmt.Errorf("number of clusters to pick is greater than number of scored clusters: %d > %d", numOfClustersToPick, len(scored))
klog.ErrorS(err, "Failed to calculate number of clusters to pick", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
}
// Pick the clusters.
//
// Note that at this point of the scheduling cycle, any cluster associated with a currently
// bound or scheduled binding should be filtered out already.
picked, notPicked := pickTopNScoredClusters(scored, numOfClustersToPick)
// Cross-reference the newly picked clusters with obsolete bindings; find out
//
// * bindings that should be created, i.e., create a binding for every cluster that is newly picked
// and does not have a binding associated with;
// * bindings that should be patched, i.e., associate a binding whose target cluster is picked again
// in the current run with the latest score and the latest scheduling policy snapshot;
// * bindings that should be deleted, i.e., mark a binding as unschedulable if its target cluster is no
// longer picked in the current run.
//
// Fields in the returned bindings are fulfilled and/or refreshed as applicable.
klog.V(2).InfoS("Cross-referencing bindings with picked clusters", "clusterSchedulingPolicySnapshot", policyRef, "numOfClustersToPick", numOfClustersToPick)
toCreate, toDelete, toPatch, err := crossReferencePickedClustersAndDeDupBindings(crpName, policy, picked, unscheduled, obsolete)
if err != nil {
klog.ErrorS(err, "Failed to cross-reference bindings with picked clusters", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Manipulate bindings accordingly.
klog.V(2).InfoS("Manipulating bindings", "clusterSchedulingPolicySnapshot", policyRef)
if err := f.manipulateBindings(ctx, policy, toCreate, toDelete, toPatch); err != nil {
klog.ErrorS(err, "Failed to manipulate bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// Requeue if needed.
//
// The scheduler will requeue to pick more clusters for the current policy snapshot if and only if
// * one or more plugins have imposed a valid batch size limit; and
// * the scheduler has found enough clusters for the current policy snapshot per this batch size limit.
//
// Note that the scheduler workflow at this point guarantees that the desired batch size is no less
// than the batch size limit, and that the number of picked clusters is no more than the batch size
// limit.
//
// Also note that if a requeue is needed, the scheduling decisions and condition are updated only
// when there are no more clusters to pick.
if shouldRequeue(state.desiredBatchSize, state.batchSizeLimit, len(toCreate)+len(toPatch)) {
return ctrl.Result{Requeue: true}, nil
}
// Extract the patched bindings.
patched := make([]*placementv1beta1.ClusterResourceBinding, 0, len(toPatch))
for _, p := range toPatch {
patched = append(patched, p.updated)
}
// Update policy snapshot status with the latest scheduling decisions and condition.
klog.V(2).InfoS("Updating policy snapshot status", "clusterSchedulingPolicySnapshot", policyRef)
if err := f.updatePolicySnapshotStatusFromBindings(ctx, policy, numOfClusters, notPicked, filtered, toCreate, patched, scheduled, bound); err != nil {
klog.ErrorS(err, "Failed to update latest scheduling decisions and condition", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
// The scheduling cycle has completed.
return ctrl.Result{}, nil
}