func()

in pkg/scheduler/objects/application.go [386:447]


func (sa *Application) timeoutPlaceholderProcessing() {
	sa.Lock()
	defer sa.Unlock()
	if (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder) {
		// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
		var toRelease []*Allocation
		replacing := 0
		for _, alloc := range sa.getPlaceholderAllocations() {
			// skip over the allocations that are already marked for release, they will be replaced soon
			if alloc.IsReleased() {
				replacing++
				continue
			}
			alloc.SetReleased(true)
			toRelease = append(toRelease, alloc)
		}
		log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated placeholders",
			zap.String("AppID", sa.ApplicationID),
			zap.Int("placeholders being replaced", replacing),
			zap.Int("releasing placeholders", len(toRelease)))
		// trigger the release of the placeholders: accounting updates when the release is done
		sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
	} else {
		// Case 2: in every other case progress the application, and notify the context about the expired placeholders
		// change the status of the app based on gang style: soft resume normal allocations, hard fail the app
		event := ResumeApplication
		if sa.gangSchedulingStyle == Hard {
			event = FailApplication
		}
		if err := sa.HandleApplicationEventWithInfo(event, "ResourceReservationTimeout"); err != nil {
			log.Log(log.SchedApplication).Debug("Application state change failed when placeholder timed out",
				zap.String("AppID", sa.ApplicationID),
				zap.String("currentState", sa.CurrentState()),
				zap.Error(err))
		}
		// all allocations are placeholders release them all
		var toRelease, pendingRelease []*Allocation
		for _, alloc := range sa.allocations {
			alloc.SetReleased(true)
			toRelease = append(toRelease, alloc)
		}
		// get all open requests and remove them all filter out already allocated as they are already released
		for _, alloc := range sa.requests {
			if !alloc.IsAllocated() {
				alloc.SetReleased(true)
				pendingRelease = append(pendingRelease, alloc)
				sa.placeholderData[alloc.taskGroupName].TimedOut++
			}
		}
		log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated and pending placeholders",
			zap.String("AppID", sa.ApplicationID),
			zap.Int("releasing placeholders", len(toRelease)),
			zap.Int("pending placeholders", len(pendingRelease)),
			zap.String("gang scheduling style", sa.gangSchedulingStyle))
		sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
		// trigger the release of the allocated placeholders: accounting updates when the release is done
		sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
		// trigger the release of the pending placeholders: accounting has been done
		sa.notifyRMAllocationReleased(pendingRelease, si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder timeout")
	}
	sa.clearPlaceholderTimer()
}