func()

in pkg/scheduler/objects/application.go [517:587]


func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord_ChangeDetail) int {
	// shortcut no need to do anything
	if len(sa.requests) == 0 {
		return 0
	}
	var deltaPendingResource *resources.Resource = nil
	// when allocation key is not specified, cleanup all allocations
	var toRelease int
	if allocKey == "" {
		// cleanup all reservations
		for _, reserve := range sa.reservations {
			releases := sa.unReserveInternal(reserve)
			toRelease += releases
		}
		// clean up the queue reservation
		sa.queue.UnReserve(sa.ApplicationID, toRelease)
		// Cleanup total pending resource
		deltaPendingResource = sa.pending
		sa.pending = resources.NewResource()
		for _, ask := range sa.requests {
			sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
		}
		sa.requests = make(map[string]*Allocation)
		sa.sortedRequests = sortedRequests{}
		sa.askMaxPriority = configs.MinPriority
		sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority)
	} else {
		// cleanup the reservation for this allocation
		if reserve, ok := sa.reservations[allocKey]; ok {
			releases := sa.unReserveInternal(reserve)
			// clean up the queue reservation
			sa.queue.UnReserve(sa.ApplicationID, releases)
			toRelease += releases
		}
		if ask := sa.requests[allocKey]; ask != nil {
			if !ask.IsAllocated() {
				deltaPendingResource = ask.GetAllocatedResource()
				sa.pending = resources.Sub(sa.pending, deltaPendingResource)
				sa.pending.Prune()
			}
			delete(sa.requests, allocKey)
			sa.sortedRequests.remove(ask)
			sa.appEvents.SendRemoveAskEvent(sa.ApplicationID, ask.allocationKey, ask.GetAllocatedResource(), detail)
			if priority := ask.GetPriority(); priority >= sa.askMaxPriority {
				sa.updateAskMaxPriority()
			}
		}
	}
	// clean up the queue pending resources
	sa.queue.decPendingResource(deltaPendingResource)
	// Check if we need to change state based on the removal:
	// 1) if pending is zero (no more asks left)
	// 2) if confirmed allocations is zero (no real tasks running)
	// Change the state to completing.
	// When the resource trackers are zero we should not expect anything to come in later.
	hasPlaceHolderAllocations := len(sa.getPlaceholderAllocations()) > 0
	if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && !sa.IsFailing() && !sa.IsCompleting() && !hasPlaceHolderAllocations {
		if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
			log.Log(log.SchedApplication).Warn("Application state not changed to Completing while updating ask(s)",
				zap.String("currentState", sa.CurrentState()),
				zap.Error(err))
		}
	}

	log.Log(log.SchedApplication).Info("ask removed successfully from application",
		zap.String("appID", sa.ApplicationID),
		zap.String("ask", allocKey),
		zap.Stringer("pendingDelta", deltaPendingResource))

	return toRelease
}