func()

in pkg/scheduler/partition.go [1450:1569]


func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) {
	if release == nil {
		return nil, nil
	}
	appID := release.ApplicationID
	allocationKey := release.GetAllocationKey()
	if appID == "" {
		pc.removeForeignAllocation(allocationKey)
		return nil, nil
	}
	app := pc.getApplication(appID)
	// no app nothing to do everything should already be clean
	if app == nil {
		log.Log(log.SchedPartition).Info("Application not found while releasing allocation",
			zap.String("appID", appID),
			zap.String("allocationId", allocationKey),
			zap.Stringer("terminationType", release.TerminationType))
		return nil, nil
	}

	// **** DO NOT MOVE **** this must be called before any allocations are released.
	// Processing a removal while in the Completing state could race with the state change. The race occurs between
	// removing the allocation and updating the queue after node processing. If the state change removes the queue link
	// before we get to updating the queue after the node we leave the resources as allocated on the queue. The queue
	// will always exist at this point. Retrieving the queue now sidesteps this.
	queue := app.GetQueue()

	released := pc.processAllocationRelease(release, app)
	pc.updatePhAllocationCount(released)

	total := resources.NewResource()
	totalPreempting := resources.NewResource()
	var confirmed *objects.Allocation
	// for each allocation to release, update the node and queue.
	for _, alloc := range released {
		node := pc.GetNode(alloc.GetNodeID())
		if node == nil {
			log.Log(log.SchedPartition).Warn("node not found while releasing allocation",
				zap.String("appID", appID),
				zap.String("allocationKey", alloc.GetAllocationKey()),
				zap.String("nodeID", alloc.GetNodeID()))
			continue
		}
		if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED {
			confirmed = alloc.GetRelease()
			// we need to check the resources equality
			delta := resources.Sub(confirmed.GetAllocatedResource(), alloc.GetAllocatedResource())
			// Any negative value in the delta means that at least one of the requested resource in the
			// placeholder is larger than the real allocation. The node and queue need adjusting.
			// The reverse case is handled during allocation.
			if delta.HasNegativeValue() {
				// This looks incorrect but the delta is negative and the result will be an increase of the
				// total tracked. The total will later be deducted from the queue usage.
				total.SubFrom(delta)
				log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation",
					zap.String("allocationKey", confirmed.GetAllocationKey()),
					zap.Stringer("requested resource", confirmed.GetAllocatedResource()),
					zap.String("placeholderKey", alloc.GetAllocationKey()),
					zap.Stringer("placeholder resource", alloc.GetAllocatedResource()))
			}
			// replacements could be on a different node and different size handle all cases
			if confirmed.GetNodeID() == alloc.GetNodeID() {
				// this is the real swap on the node, adjust usage if needed
				node.ReplaceAllocation(alloc.GetAllocationKey(), confirmed, delta)
			} else {
				// we have already added the real allocation to the new node, just remove the placeholder
				node.RemoveAllocation(alloc.GetAllocationKey())
			}
			log.Log(log.SchedPartition).Info("replacing placeholder allocation on node",
				zap.String("nodeID", alloc.GetNodeID()),
				zap.String("allocationKey", alloc.GetAllocationKey()),
				zap.String("allocation nodeID", confirmed.GetNodeID()))
		} else if node.RemoveAllocation(alloc.GetAllocationKey()) != nil {
			// all non replacement are real removes: must update the queue usage
			total.AddTo(alloc.GetAllocatedResource())
			log.Log(log.SchedPartition).Info("removing allocation from node",
				zap.String("nodeID", alloc.GetNodeID()),
				zap.String("allocationKey", alloc.GetAllocationKey()))
		}
		if alloc.IsPreempted() {
			totalPreempting.AddTo(alloc.GetAllocatedResource())
		}
	}

	if resources.StrictlyGreaterThanZero(total) {
		if err := queue.DecAllocatedResource(total); err != nil {
			log.Log(log.SchedPartition).Warn("failed to release resources from queue",
				zap.String("appID", appID),
				zap.String("allocationKey", allocationKey),
				zap.Error(err))
		}
	}
	if resources.StrictlyGreaterThanZero(totalPreempting) {
		queue.DecPreemptingResource(totalPreempting)
	}

	// if confirmed is set we can assume there will just be one alloc in the released
	// that allocation was already released by the shim, so clean up released
	if confirmed != nil {
		released = nil
	}
	// track the number of allocations, when we replace the result is no change
	if allocReleases := len(released); allocReleases > 0 {
		pc.updateAllocationCount(-allocReleases)
		metrics.GetQueueMetrics(queue.GetQueuePath()).AddReleasedContainers(allocReleases)
	}

	// if the termination type is TIMEOUT/PREEMPTED_BY_SCHEDULER, we don't notify the shim,
	// because the release that is processed now is a confirmation returned by the shim to the core
	if release.TerminationType == si.TerminationType_TIMEOUT || release.TerminationType == si.TerminationType_PREEMPTED_BY_SCHEDULER {
		released = nil
	}

	if release.TerminationType != si.TerminationType_TIMEOUT {
		// handle ask releases as well
		_ = app.RemoveAllocationAsk(allocationKey)
	}

	return released, confirmed
}