func()

in pkg/capacityscheduling/capacity_scheduling.go [476:656]


func (p *preemptor) SelectVictimsOnNode(
	ctx context.Context,
	state *framework.CycleState,
	pod *v1.Pod,
	nodeInfo *framework.NodeInfo,
	pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
	elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(state)
	if err != nil {
		msg := "Failed to read elasticQuotaSnapshot from cycleState"
		klog.ErrorS(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
		return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
	}

	preFilterState, err := getPreFilterState(state)
	if err != nil {
		msg := "Failed to read preFilterState from cycleState"
		klog.ErrorS(err, msg, "preFilterStateKey", preFilterStateKey)
		return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
	}

	var nominatedPodsReqInEQWithPodReq framework.Resource
	var nominatedPodsReqWithPodReq framework.Resource
	podReq := preFilterState.podReq

	logger := klog.FromContext(ctx)
	removePod := func(rpi *framework.PodInfo) error {
		if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
			return err
		}
		status := p.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
		if !status.IsSuccess() {
			return status.AsError()
		}
		return nil
	}
	addPod := func(api *framework.PodInfo) error {
		nodeInfo.AddPodInfo(api)
		status := p.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
		if !status.IsSuccess() {
			return status.AsError()
		}
		return nil
	}

	elasticQuotaInfos := elasticQuotaSnapshotState.elasticQuotaInfos
	podPriority := corev1helpers.PodPriority(pod)
	preemptorElasticQuotaInfo, preemptorWithElasticQuota := elasticQuotaInfos[pod.Namespace]

	// sort the pods in node by the priority class
	sort.Slice(nodeInfo.Pods, func(i, j int) bool { return !schedutil.MoreImportantPod(nodeInfo.Pods[i].Pod, nodeInfo.Pods[j].Pod) })

	var potentialVictims []*framework.PodInfo
	if preemptorWithElasticQuota {
		nominatedPodsReqInEQWithPodReq = preFilterState.nominatedPodsReqInEQWithPodReq
		nominatedPodsReqWithPodReq = preFilterState.nominatedPodsReqWithPodReq
		moreThanMinWithPreemptor := preemptorElasticQuotaInfo.usedOverMinWith(&nominatedPodsReqInEQWithPodReq)
		for _, p := range nodeInfo.Pods {
			eqInfo, withEQ := elasticQuotaInfos[p.Pod.Namespace]
			if !withEQ {
				continue
			}

			if moreThanMinWithPreemptor {
				// If Preemptor.Request + Quota.Used > Quota.Min:
				// It means that its guaranteed isn't borrowed by other
				// quotas. So that we will select the pods which subject to the
				// same quota(namespace) with the lower priority than the
				// preemptor's priority as potential victims in a node.
				if p.Pod.Namespace == pod.Namespace && corev1helpers.PodPriority(p.Pod) < podPriority {
					potentialVictims = append(potentialVictims, p)
					if err := removePod(p); err != nil {
						return nil, 0, framework.AsStatus(err)
					}
				}

			} else {
				// If Preemptor.Request + Quota.allocated <= Quota.min: It
				// means that its min(guaranteed) resource is used or
				// `borrowed` by other Quota. Potential victims in a node
				// will be chosen from Quotas that allocates more resources
				// than its min, i.e., borrowing resources from other
				// Quotas.
				if p.Pod.Namespace != pod.Namespace && eqInfo.usedOverMin() {
					potentialVictims = append(potentialVictims, p)
					if err := removePod(p); err != nil {
						return nil, 0, framework.AsStatus(err)
					}
				}
			}
		}
	} else {
		for _, p := range nodeInfo.Pods {
			_, withEQ := elasticQuotaInfos[p.Pod.Namespace]
			if withEQ {
				continue
			}
			if corev1helpers.PodPriority(p.Pod) < podPriority {
				potentialVictims = append(potentialVictims, p)
				if err := removePod(p); err != nil {
					return nil, 0, framework.AsStatus(err)
				}
			}
		}
	}

	// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
	if len(potentialVictims) == 0 {
		message := fmt.Sprintf("No victims found on node %v for preemptor pod %v", nodeInfo.Node().Name, pod.Name)
		return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, message)
	}

	// If the new pod does not fit after removing all the lower priority pods,
	// we are almost done and this node is not suitable for preemption. The only
	// condition that we could check is if the "pod" is failing to schedule due to
	// inter-pod affinity to one or more victims, but we have decided not to
	// support this case for performance reasons. Having affinity to lower
	// priority pods is not a recommended configuration anyway.
	if s := p.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !s.IsSuccess() {
		return nil, 0, s
	}

	// If the quota.used + pod.request > quota.max or sum(quotas.used) + pod.request > sum(quotas.min)
	// after removing all the lower priority pods,
	// we are almost done and this node is not suitable for preemption.
	if preemptorWithElasticQuota {
		if preemptorElasticQuotaInfo.usedOverMaxWith(&podReq) ||
			elasticQuotaInfos.aggregatedUsedOverMinWith(podReq) {
			return nil, 0, framework.NewStatus(framework.Unschedulable, "global quota max exceeded")
		}
	}

	var victims []*v1.Pod
	numViolatingVictim := 0
	sort.Slice(potentialVictims, func(i, j int) bool {
		return schedutil.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod)
	})
	// Try to reprieve as many pods as possible. We first try to reprieve the PDB
	// violating victims and then other non-violating ones. In both cases, we start
	// from the highest priority victims.
	violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
	reprievePod := func(pi *framework.PodInfo) (bool, error) {
		if err := addPod(pi); err != nil {
			return false, err
		}
		s := p.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
		fits := s.IsSuccess()
		if !fits {
			if err := removePod(pi); err != nil {
				return false, err
			}
			victims = append(victims, pi.Pod)
			klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node()))
		}

		if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.usedOverMaxWith(&nominatedPodsReqInEQWithPodReq) || elasticQuotaInfos.aggregatedUsedOverMinWith(nominatedPodsReqWithPodReq)) {
			if err := removePod(pi); err != nil {
				return false, err
			}
			victims = append(victims, pi.Pod)
			klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node()))
		}

		return fits, nil
	}
	for _, pi := range violatingVictims {
		if fits, err := reprievePod(pi); err != nil {
			klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
			return nil, 0, framework.AsStatus(err)
		} else if !fits {
			numViolatingVictim++
		}
	}
	// Now we try to reprieve non-violating victims.
	for _, pi := range nonViolatingVictims {
		if _, err := reprievePod(pi); err != nil {
			klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
			return nil, 0, framework.AsStatus(err)
		}
	}
	return victims, numViolatingVictim, framework.NewStatus(framework.Success)
}