func()

in pkg/scheduler/partition.go [677:797]


func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) {
	released := make([]*objects.Allocation, 0)
	confirmed := make([]*objects.Allocation, 0)
	// walk over all allocations still registered for this node
	for _, alloc := range node.GetYunikornAllocations() {
		allocationKey := alloc.GetAllocationKey()
		// since we are not locking the node and or application we could have had an update while processing
		// note that we do not return the allocation if the app or allocation is not found and assume that it
		// was already removed
		app := pc.getApplication(alloc.GetApplicationID())
		if app == nil {
			log.Log(log.SchedPartition).Info("app is not found, skipping while removing the node",
				zap.String("appID", alloc.GetApplicationID()),
				zap.String("nodeID", node.NodeID))
			continue
		}
		// Processing a removal while in the Completing state could race with the state change.
		// Retrieve the queue early before a possible race.
		queue := app.GetQueue()
		// check for an inflight replacement.
		if alloc.HasRelease() {
			release := alloc.GetRelease()
			// allocation to update the ask on: this needs to happen on the real alloc never the placeholder
			askAlloc := alloc
			// placeholder gets handled differently from normal
			if alloc.IsPlaceholder() {
				// Check if the real allocation is made on the same node if not we should trigger a confirmation of
				// the replacement. Trigger the replacement only if it is NOT on the same node.
				// If it is on the same node we just keep going as the real allocation will be unlinked as a result of
				// the removal of this placeholder. The ask update will trigger rescheduling later for the real alloc.
				if alloc.GetNodeID() != release.GetNodeID() {
					// ignore the return as that is the same as alloc, the alloc is gone after this call
					_ = app.ReplaceAllocation(allocationKey)
					// we need to check the resources equality
					delta := resources.Sub(release.GetAllocatedResource(), alloc.GetAllocatedResource())
					// Any negative value in the delta means that at least one of the requested resource in the
					// placeholder is larger than the real allocation. The nodes are correct the queue needs adjusting.
					// The reverse case is handled during allocation.
					if delta.HasNegativeValue() {
						// this looks incorrect but the delta is negative and the result will be a real decrease
						err := queue.TryIncAllocatedResource(delta)
						// this should not happen as we really decrease the value
						if err != nil {
							log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder",
								zap.String("appID", alloc.GetApplicationID()),
								zap.String("placeholderKey", alloc.GetAllocationKey()),
								zap.String("allocationKey", release.GetAllocationKey()),
								zap.Error(err))
						}
						log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation",
							zap.String("allocationKey", release.GetAllocationKey()),
							zap.Stringer("requested resource", release.GetAllocatedResource()),
							zap.String("placeholderKey", alloc.GetAllocationKey()),
							zap.Stringer("placeholder resource", alloc.GetAllocatedResource()))
					}
					// track what we confirm on the other node to confirm it in the shim and get is bound
					confirmed = append(confirmed, release)
					// the allocation is removed so add it to the list that we return
					released = append(released, alloc)
					log.Log(log.SchedPartition).Info("allocation removed from node and replacement confirmed",
						zap.String("nodeID", node.NodeID),
						zap.String("allocationKey", allocationKey),
						zap.String("replacement nodeID", release.GetNodeID()),
						zap.String("replacement allocationKey", release.GetAllocationKey()))
					continue
				}
				askAlloc = release
			}
			// unlink the placeholder and allocation
			release.ClearRelease()
			alloc.ClearRelease()
			// mark ask as unallocated to get it re-scheduled
			_, err := app.DeallocateAsk(askAlloc.GetAllocationKey())
			if err == nil {
				log.Log(log.SchedPartition).Info("inflight placeholder replacement reversed due to node removal",
					zap.String("appID", askAlloc.GetApplicationID()),
					zap.String("allocationKey", askAlloc.GetAllocationKey()),
					zap.String("nodeID", node.NodeID),
					zap.String("replacement allocationKey", askAlloc.GetAllocationKey()))
			} else {
				log.Log(log.SchedPartition).Error("node removal: repeat update failure for inflight replacement",
					zap.String("appID", askAlloc.GetApplicationID()),
					zap.String("allocationKey", askAlloc.GetAllocationKey()),
					zap.String("nodeID", node.NodeID),
					zap.Error(err))
			}
		}
		// check allocations on the app
		if app.RemoveAllocation(allocationKey, si.TerminationType_UNKNOWN_TERMINATION_TYPE) == nil {
			log.Log(log.SchedPartition).Info("allocation is not found, skipping while removing the node",
				zap.String("allocationKey", allocationKey),
				zap.String("appID", app.ApplicationID),
				zap.String("nodeID", node.NodeID))
			continue
		}
		if err := queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
			log.Log(log.SchedPartition).Warn("failed to release resources from queue",
				zap.String("appID", alloc.GetApplicationID()),
				zap.Error(err))
		}
		// remove preempted resources
		if alloc.IsPreempted() {
			queue.DecPreemptingResource(alloc.GetAllocatedResource())
		}
		if alloc.IsPlaceholder() {
			pc.decPhAllocationCount(1)
		}

		// the allocation is removed so add it to the list that we return
		released = append(released, alloc)
		metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
		log.Log(log.SchedPartition).Info("node removal: allocation removed",
			zap.String("nodeID", node.NodeID),
			zap.String("queueName", queue.GetQueuePath()),
			zap.String("appID", app.ApplicationID),
			zap.Stringer("allocation", alloc))
	}
	// track the number of allocations: decrement the released allocation AND increment with the confirmed
	pc.updateAllocationCount(len(confirmed) - len(released))
	return released, confirmed
}