func()

in pkg/scheduler/objects/preemption.go [214:348]


func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
	nodeCurrentAvailable := nodeAvailable.Clone()

	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
	// to queue limits and not node resource limits.
	if nodeCurrentAvailable.FitIn(p.ask.GetAllocatedResource()) {
		// return empty list so this node is considered for preemption
		return -1, make([]*Allocation, 0)
	}

	allocationsByQueueSnap := p.duplicateQueueSnapshots()
	// get the current queue snapshot
	askQueue, ok := allocationsByQueueSnap[p.queuePath]
	if !ok {
		log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
		return -1, nil
	}

	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
	// capacity and the queue guaranteed headroom.
	head := make([]*Allocation, 0)
	tail := make([]*Allocation, 0)
	for _, victim := range potentialVictims {
		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
				oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
				preemptableResource := queueSnapshot.GetPreemptableResource()

				// Did removing this allocation still keep the queue over-allocated?
				// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
				// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
				// some really useful victim is there.
				// In case of victims densely populated on any specific node, checking/honouring the guaranteed quota on ask or preemptor queue
				// acts as early filtering layer to carry forward only the required victims.
				// For other cases like victims spread over multiple nodes, this doesn't add great value.
				if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
					(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
					// add the current victim into the ask queue
					askQueue.AddAllocation(victim.GetAllocatedResource())
					askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()

					// Did adding this allocation make the ask queue over - utilized?
					if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
						askQueue.RemoveAllocation(victim.GetAllocatedResource())
						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
						break
					}

					// check to see if the shortfall on the node has changed
					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
					if resources.EqualsOrEmpty(shortfall, newShortfall) {
						// shortfall did not change, so task should only be considered as a last resort
						askQueue.RemoveAllocation(victim.GetAllocatedResource())
						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
						tail = append(tail, victim)
					} else {
						// shortfall was decreased, so we should keep this task on the main list and adjust usage
						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
						head = append(head, victim)
					}
				} else {
					// removing this allocation would have reduced queue below guaranteed limits, put it back
					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
				}
			}
		}
	}
	// merge lists
	head = append(head, tail...)
	if len(head) == 0 {
		return -1, nil
	}

	// clone again
	nodeCurrentAvailable = nodeAvailable.Clone()
	allocationsByQueueSnap = p.duplicateQueueSnapshots()

	// get the current queue snapshot
	_, ok2 := allocationsByQueueSnap[p.queuePath]
	if !ok2 {
		log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
		return -1, nil
	}

	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
	// which would reduce the shortfall to zero.
	results := make([]*Allocation, 0)
	index := -1
	for _, victim := range head {
		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
				oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
				preemptableResource := queueSnapshot.GetPreemptableResource()

				// Did removing this allocation still keep the queue over-allocated?
				// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
				// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
				// some really useful victim is there.
				// Similar checks could be added even on the ask or preemptor queue to prevent being over utilized.
				if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
					(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
					// removing task does not violate queue constraints, adjust queue and node
					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
					// check if ask now fits and we haven't had this happen before
					if nodeCurrentAvailable.FitIn(p.ask.GetAllocatedResource()) && index < 0 {
						index = len(results)
					}
					// add victim to results
					results = append(results, victim)
				} else {
					// add back resources
					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
				}
			}
		}
	}

	// check to see if enough resources were freed
	if index < 0 {
		return -1, nil
	}

	return index, results
}