in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1164:1417]
boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
return true;
}
ContainerId[] preemptedContainers = null;
int numPendingRequestsToService = 0;
synchronized (this) {
Resource freeResources = this.getAvailableResources();
if (LOG.isDebugEnabled()) {
LOG.debug(constructPreemptionPeriodicLog(freeResources));
} else {
if (numHeartbeats % 50 == 1) {
LOG.info(constructPreemptionPeriodicLog(freeResources));
}
}
assert freeResources.getMemory() >= 0;
CookieContainerRequest highestPriRequest = null;
int numHighestPriRequests = 0;
for(CookieContainerRequest request : taskRequests.values()) {
if(highestPriRequest == null) {
highestPriRequest = request;
numHighestPriRequests = 1;
} else if(isHigherPriority(request.getPriority(),
highestPriRequest.getPriority())){
highestPriRequest = request;
numHighestPriRequests = 1;
} else if (request.getPriority().equals(highestPriRequest.getPriority())) {
numHighestPriRequests++;
}
}
if (highestPriRequest == null) {
// nothing pending
resetHighestWaitingPriority(null);
return true;
}
// reset the wait time when waiting priority changes to prevent carry over of the value
if (highestWaitingRequestPriority == null ||
!highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) {
resetHighestWaitingPriority(highestPriRequest.getPriority());
}
long currTime = System.currentTimeMillis();
if (highestWaitingRequestWaitStartTime == 0) {
highestWaitingRequestWaitStartTime = currTime;
}
boolean preemptionWaitDeadlineCrossed =
(currTime - highestWaitingRequestWaitStartTime) > preemptionMaxWaitTime ? true : false;
if(!preemptionWaitDeadlineCrossed &&
fitsIn(highestPriRequest.getCapability(), freeResources)) {
LOG.debug("{} fits in free resources", highestPriRequest);
if (numHeartbeats % 50 == 1) {
LOG.info(highestPriRequest + " fits in free resources");
}
return true;
}
if (preemptionWaitDeadlineCrossed) {
// check if anything lower priority is running - priority inversion
// this check could have been done earlier but in the common case
// this would be unnecessary since there are usually requests pending
// in the normal case without priority inversion. So do this expensive
// iteration now
boolean lowerPriRunning = false;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Object signature = lastTaskInfo.getCookie().getContainerSignature();
if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
// lower priority task is running
if (containerSignatureMatcher.isExactMatch(
highestPriRequest.getCookie().getContainerSignature(),
signature)) {
// exact match with different priorities
continue;
}
lowerPriRunning = true;
break;
}
}
if (!lowerPriRunning) {
// nothing lower priority running
// normal case of many pending request without priority inversion
resetHighestWaitingPriority(null);
return true;
}
LOG.info("Preemption deadline crossed at pri: " + highestPriRequest.getPriority()
+ " numRequests: " + numHighestPriRequests + ". "
+ constructPreemptionPeriodicLog(freeResources));
}
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests,
preemptionPercentage);
if (numPendingRequestsToService < 1) {
// nothing to preempt - reset preemption last heartbeat
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to service " + numPendingRequestsToService + " out of total "
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}
int newContainersReleased = 0;
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all existing
// containers when request was made.
Container lowestPriNewContainer = null;
// could not find anything to preempt. Check if we can release unused
// containers
for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
if (!heldContainer.isNew()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ heldContainer.getContainer().getId());
}
return true;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
if (LOG.isDebugEnabled()) {
LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ heldContainer.getContainer().getId());
}
return true;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
// there is a lower priority new container
lowestPriNewContainer = container;
}
}
if (lowestPriNewContainer != null) {
LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
" with priority: " + lowestPriNewContainer.getPriority() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
newContainersReleased++;
releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
// We are returning an unused resource back the RM. The RM thinks it
// has serviced our initial request and will not re-allocate this back
// to us anymore. So we need to ask for this again. If there is no
// outstanding request at that priority then its fine to not ask again.
// See TEZ-915 for more details
maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority());
// come back and free more new containers if needed
continue;
}
}
numPendingRequestsToService -= newContainersReleased;
if (numPendingRequestsToService < 1) {
return true;
}
// there are no reused or new containers to release. try to preempt running containers
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
if (!delayedContainerManager.delayedContainers.isEmpty()) {
LOG.warn("Expected delayed containers to be empty. "
+ constructPreemptionPeriodicLog(freeResources));
}
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Object signature = lastTaskInfo.getCookie().getContainerSignature();
if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
// higher or same priority
continue;
}
if (containerSignatureMatcher.isExactMatch(
highestPriRequest.getCookie().getContainerSignature(),
signature)) {
// exact match with different priorities
continue;
}
if (preemptedTaskPriority == null ||
!isHigherPriority(taskPriority, preemptedTaskPriority)) {
// keep the lower priority
if (taskPriority.equals(preemptedTaskPriority)) {
numEntriesAtPreemptedPriority++;
} else {
// this is at a lower priority than existing
numEntriesAtPreemptedPriority = 1;
}
preemptedTaskPriority = taskPriority;
}
}
if(preemptedTaskPriority != null) {
int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min(
numEntriesAtPreemptedPriority, numHighestPriRequests), preemptionPercentage);
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
return true;
}
// wait for enough heartbeats since this request became active for preemption
if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) {
// stop incrementing lastpreemption heartbeat count
return false;
}
LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority() + " by preempting from "
+ numEntriesAtPreemptedPriority + " running tasks at priority: " + preemptedTaskPriority);
// found something to preempt. get others of the same priority
preemptedContainers = new ContainerId[numPendingRequestsToService];
int currIndex = 0;
for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Container container = entry.getValue();
if (preemptedTaskPriority.equals(taskPriority)) {
// taskAllocations map will iterate from oldest to newest assigned containers
// keep the N newest containersIds with the matching priority
preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId();
}
}
// app client will be notified when after container is killed
// and we get its completed container status
}
}
// upcall outside locks
if (preemptedContainers != null) {
for(int i=0; i<numPendingRequestsToService; ++i) {
ContainerId cId = preemptedContainers[i];
if (cId != null) {
LOG.info("Preempting container: " + cId + " currently allocated to a task.");
getContext().preemptContainer(cId);
}
}
}
return true;
}