in pkg/scheduler/partition.go [1450:1569]
func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) {
if release == nil {
return nil, nil
}
appID := release.ApplicationID
allocationKey := release.GetAllocationKey()
if appID == "" {
pc.removeForeignAllocation(allocationKey)
return nil, nil
}
app := pc.getApplication(appID)
// no app nothing to do everything should already be clean
if app == nil {
log.Log(log.SchedPartition).Info("Application not found while releasing allocation",
zap.String("appID", appID),
zap.String("allocationId", allocationKey),
zap.Stringer("terminationType", release.TerminationType))
return nil, nil
}
// **** DO NOT MOVE **** this must be called before any allocations are released.
// Processing a removal while in the Completing state could race with the state change. The race occurs between
// removing the allocation and updating the queue after node processing. If the state change removes the queue link
// before we get to updating the queue after the node we leave the resources as allocated on the queue. The queue
// will always exist at this point. Retrieving the queue now sidesteps this.
queue := app.GetQueue()
released := pc.processAllocationRelease(release, app)
pc.updatePhAllocationCount(released)
total := resources.NewResource()
totalPreempting := resources.NewResource()
var confirmed *objects.Allocation
// for each allocation to release, update the node and queue.
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
log.Log(log.SchedPartition).Warn("node not found while releasing allocation",
zap.String("appID", appID),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.String("nodeID", alloc.GetNodeID()))
continue
}
if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED {
confirmed = alloc.GetRelease()
// we need to check the resources equality
delta := resources.Sub(confirmed.GetAllocatedResource(), alloc.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the
// placeholder is larger than the real allocation. The node and queue need adjusting.
// The reverse case is handled during allocation.
if delta.HasNegativeValue() {
// This looks incorrect but the delta is negative and the result will be an increase of the
// total tracked. The total will later be deducted from the queue usage.
total.SubFrom(delta)
log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation",
zap.String("allocationKey", confirmed.GetAllocationKey()),
zap.Stringer("requested resource", confirmed.GetAllocatedResource()),
zap.String("placeholderKey", alloc.GetAllocationKey()),
zap.Stringer("placeholder resource", alloc.GetAllocatedResource()))
}
// replacements could be on a different node and different size handle all cases
if confirmed.GetNodeID() == alloc.GetNodeID() {
// this is the real swap on the node, adjust usage if needed
node.ReplaceAllocation(alloc.GetAllocationKey(), confirmed, delta)
} else {
// we have already added the real allocation to the new node, just remove the placeholder
node.RemoveAllocation(alloc.GetAllocationKey())
}
log.Log(log.SchedPartition).Info("replacing placeholder allocation on node",
zap.String("nodeID", alloc.GetNodeID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.String("allocation nodeID", confirmed.GetNodeID()))
} else if node.RemoveAllocation(alloc.GetAllocationKey()) != nil {
// all non replacement are real removes: must update the queue usage
total.AddTo(alloc.GetAllocatedResource())
log.Log(log.SchedPartition).Info("removing allocation from node",
zap.String("nodeID", alloc.GetNodeID()),
zap.String("allocationKey", alloc.GetAllocationKey()))
}
if alloc.IsPreempted() {
totalPreempting.AddTo(alloc.GetAllocatedResource())
}
}
if resources.StrictlyGreaterThanZero(total) {
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
zap.String("appID", appID),
zap.String("allocationKey", allocationKey),
zap.Error(err))
}
}
if resources.StrictlyGreaterThanZero(totalPreempting) {
queue.DecPreemptingResource(totalPreempting)
}
// if confirmed is set we can assume there will just be one alloc in the released
// that allocation was already released by the shim, so clean up released
if confirmed != nil {
released = nil
}
// track the number of allocations, when we replace the result is no change
if allocReleases := len(released); allocReleases > 0 {
pc.updateAllocationCount(-allocReleases)
metrics.GetQueueMetrics(queue.GetQueuePath()).AddReleasedContainers(allocReleases)
}
// if the termination type is TIMEOUT/PREEMPTED_BY_SCHEDULER, we don't notify the shim,
// because the release that is processed now is a confirmation returned by the shim to the core
if release.TerminationType == si.TerminationType_TIMEOUT || release.TerminationType == si.TerminationType_PREEMPTED_BY_SCHEDULER {
released = nil
}
if release.TerminationType != si.TerminationType_TIMEOUT {
// handle ask releases as well
_ = app.RemoveAllocationAsk(allocationKey)
}
return released, confirmed
}