in pkg/scheduler/objects/preemption.go [214:348]
func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
nodeCurrentAvailable := nodeAvailable.Clone()
// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
// to queue limits and not node resource limits.
if nodeCurrentAvailable.FitIn(p.ask.GetAllocatedResource()) {
// return empty list so this node is considered for preemption
return -1, make([]*Allocation, 0)
}
allocationsByQueueSnap := p.duplicateQueueSnapshots()
// get the current queue snapshot
askQueue, ok := allocationsByQueueSnap[p.queuePath]
if !ok {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
return -1, nil
}
// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
// capacity and the queue guaranteed headroom.
head := make([]*Allocation, 0)
tail := make([]*Allocation, 0)
for _, victim := range potentialVictims {
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
preemptableResource := queueSnapshot.GetPreemptableResource()
// Did removing this allocation still keep the queue over-allocated?
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
// some really useful victim is there.
// In case of victims densely populated on any specific node, checking/honouring the guaranteed quota on ask or preemptor queue
// acts as early filtering layer to carry forward only the required victims.
// For other cases like victims spread over multiple nodes, this doesn't add great value.
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
// add the current victim into the ask queue
askQueue.AddAllocation(victim.GetAllocatedResource())
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()
// Did adding this allocation make the ask queue over - utilized?
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
askQueue.RemoveAllocation(victim.GetAllocatedResource())
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
break
}
// check to see if the shortfall on the node has changed
shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
if resources.EqualsOrEmpty(shortfall, newShortfall) {
// shortfall did not change, so task should only be considered as a last resort
askQueue.RemoveAllocation(victim.GetAllocatedResource())
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
tail = append(tail, victim)
} else {
// shortfall was decreased, so we should keep this task on the main list and adjust usage
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
head = append(head, victim)
}
} else {
// removing this allocation would have reduced queue below guaranteed limits, put it back
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
}
}
}
}
// merge lists
head = append(head, tail...)
if len(head) == 0 {
return -1, nil
}
// clone again
nodeCurrentAvailable = nodeAvailable.Clone()
allocationsByQueueSnap = p.duplicateQueueSnapshots()
// get the current queue snapshot
_, ok2 := allocationsByQueueSnap[p.queuePath]
if !ok2 {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
return -1, nil
}
// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
// both the node available capacity and the queue headroom. Save the Index within the results of the first task
// which would reduce the shortfall to zero.
results := make([]*Allocation, 0)
index := -1
for _, victim := range head {
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
preemptableResource := queueSnapshot.GetPreemptableResource()
// Did removing this allocation still keep the queue over-allocated?
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
// some really useful victim is there.
// Similar checks could be added even on the ask or preemptor queue to prevent being over utilized.
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
// removing task does not violate queue constraints, adjust queue and node
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
// check if ask now fits and we haven't had this happen before
if nodeCurrentAvailable.FitIn(p.ask.GetAllocatedResource()) && index < 0 {
index = len(results)
}
// add victim to results
results = append(results, victim)
} else {
// add back resources
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
}
}
}
}
// check to see if enough resources were freed
if index < 0 {
return -1, nil
}
return index, results
}