in pkg/scheduler/partition.go [677:797]
func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) {
released := make([]*objects.Allocation, 0)
confirmed := make([]*objects.Allocation, 0)
// walk over all allocations still registered for this node
for _, alloc := range node.GetYunikornAllocations() {
allocationKey := alloc.GetAllocationKey()
// since we are not locking the node and or application we could have had an update while processing
// note that we do not return the allocation if the app or allocation is not found and assume that it
// was already removed
app := pc.getApplication(alloc.GetApplicationID())
if app == nil {
log.Log(log.SchedPartition).Info("app is not found, skipping while removing the node",
zap.String("appID", alloc.GetApplicationID()),
zap.String("nodeID", node.NodeID))
continue
}
// Processing a removal while in the Completing state could race with the state change.
// Retrieve the queue early before a possible race.
queue := app.GetQueue()
// check for an inflight replacement.
if alloc.HasRelease() {
release := alloc.GetRelease()
// allocation to update the ask on: this needs to happen on the real alloc never the placeholder
askAlloc := alloc
// placeholder gets handled differently from normal
if alloc.IsPlaceholder() {
// Check if the real allocation is made on the same node if not we should trigger a confirmation of
// the replacement. Trigger the replacement only if it is NOT on the same node.
// If it is on the same node we just keep going as the real allocation will be unlinked as a result of
// the removal of this placeholder. The ask update will trigger rescheduling later for the real alloc.
if alloc.GetNodeID() != release.GetNodeID() {
// ignore the return as that is the same as alloc, the alloc is gone after this call
_ = app.ReplaceAllocation(allocationKey)
// we need to check the resources equality
delta := resources.Sub(release.GetAllocatedResource(), alloc.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the
// placeholder is larger than the real allocation. The nodes are correct the queue needs adjusting.
// The reverse case is handled during allocation.
if delta.HasNegativeValue() {
// this looks incorrect but the delta is negative and the result will be a real decrease
err := queue.TryIncAllocatedResource(delta)
// this should not happen as we really decrease the value
if err != nil {
log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder",
zap.String("appID", alloc.GetApplicationID()),
zap.String("placeholderKey", alloc.GetAllocationKey()),
zap.String("allocationKey", release.GetAllocationKey()),
zap.Error(err))
}
log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation",
zap.String("allocationKey", release.GetAllocationKey()),
zap.Stringer("requested resource", release.GetAllocatedResource()),
zap.String("placeholderKey", alloc.GetAllocationKey()),
zap.Stringer("placeholder resource", alloc.GetAllocatedResource()))
}
// track what we confirm on the other node to confirm it in the shim and get is bound
confirmed = append(confirmed, release)
// the allocation is removed so add it to the list that we return
released = append(released, alloc)
log.Log(log.SchedPartition).Info("allocation removed from node and replacement confirmed",
zap.String("nodeID", node.NodeID),
zap.String("allocationKey", allocationKey),
zap.String("replacement nodeID", release.GetNodeID()),
zap.String("replacement allocationKey", release.GetAllocationKey()))
continue
}
askAlloc = release
}
// unlink the placeholder and allocation
release.ClearRelease()
alloc.ClearRelease()
// mark ask as unallocated to get it re-scheduled
_, err := app.DeallocateAsk(askAlloc.GetAllocationKey())
if err == nil {
log.Log(log.SchedPartition).Info("inflight placeholder replacement reversed due to node removal",
zap.String("appID", askAlloc.GetApplicationID()),
zap.String("allocationKey", askAlloc.GetAllocationKey()),
zap.String("nodeID", node.NodeID),
zap.String("replacement allocationKey", askAlloc.GetAllocationKey()))
} else {
log.Log(log.SchedPartition).Error("node removal: repeat update failure for inflight replacement",
zap.String("appID", askAlloc.GetApplicationID()),
zap.String("allocationKey", askAlloc.GetAllocationKey()),
zap.String("nodeID", node.NodeID),
zap.Error(err))
}
}
// check allocations on the app
if app.RemoveAllocation(allocationKey, si.TerminationType_UNKNOWN_TERMINATION_TYPE) == nil {
log.Log(log.SchedPartition).Info("allocation is not found, skipping while removing the node",
zap.String("allocationKey", allocationKey),
zap.String("appID", app.ApplicationID),
zap.String("nodeID", node.NodeID))
continue
}
if err := queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
zap.String("appID", alloc.GetApplicationID()),
zap.Error(err))
}
// remove preempted resources
if alloc.IsPreempted() {
queue.DecPreemptingResource(alloc.GetAllocatedResource())
}
if alloc.IsPlaceholder() {
pc.decPhAllocationCount(1)
}
// the allocation is removed so add it to the list that we return
released = append(released, alloc)
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
log.Log(log.SchedPartition).Info("node removal: allocation removed",
zap.String("nodeID", node.NodeID),
zap.String("queueName", queue.GetQueuePath()),
zap.String("appID", app.ApplicationID),
zap.Stringer("allocation", alloc))
}
// track the number of allocations: decrement the released allocation AND increment with the confirmed
pc.updateAllocationCount(len(confirmed) - len(released))
return released, confirmed
}