in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1027:1160]
void preemptIfNeeded() {
ContainerId preemptedContainer = null;
synchronized (this) {
Resource freeResources = Resources.subtract(totalResources,
allocatedResources);
if (LOG.isDebugEnabled()) {
LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
" cpu:" + allocatedResources.getVirtualCores() +
" delayedContainers: " + delayedContainerManager.delayedContainers.size());
}
assert freeResources.getMemory() >= 0;
CookieContainerRequest highestPriRequest = null;
for(CookieContainerRequest request : taskRequests.values()) {
if(highestPriRequest == null) {
highestPriRequest = request;
} else if(isHigherPriority(request.getPriority(),
highestPriRequest.getPriority())){
highestPriRequest = request;
}
}
if(highestPriRequest != null &&
!fitsIn(highestPriRequest.getCapability(), freeResources)) {
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
// This request must have been considered for matching with all existing
// containers when request was made.
Container lowestPriNewContainer = null;
// could not find anything to preempt. Check if we can release unused
// containers
for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
if (!heldContainer.isNew()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ heldContainer.getContainer().getId());
}
return;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
if (LOG.isDebugEnabled()) {
LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ heldContainer.getContainer().getId());
}
return;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
// there is a lower priority new container
lowestPriNewContainer = container;
}
}
if (lowestPriNewContainer != null) {
LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
" with priority: " + lowestPriNewContainer.getPriority() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
// We are returning an unused resource back the RM. The RM thinks it
// has serviced our initial request and will not re-allocate this back
// to us anymore. So we need to ask for this again. If there is no
// outstanding request at that priority then its fine to not ask again.
// See TEZ-915 for more details
for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
Object task = entry.getKey();
CookieContainerRequest request = entry.getValue();
if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
LOG.info("Resending request for task again: " + task);
deallocateTask(task, true);
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
(request.getRacks() == null ? null :
request.getRacks().toArray(new String[request.getRacks().size()])),
request.getPriority(),
request.getCookie().getContainerSignature(),
request.getCookie().getAppCookie());
break;
}
}
return;
}
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
// there are no reused or new containers to release
// try to preempt running containers
Map.Entry<Object, Container> preemptedEntry = null;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
Priority taskPriority = lastTaskInfo.getPriority();
Object signature = lastTaskInfo.getCookie().getContainerSignature();
if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
// higher or same priority
continue;
}
if (containerSignatureMatcher.isExactMatch(
highestPriRequest.getCookie().getContainerSignature(),
signature)) {
// exact match with different priorities
continue;
}
if(preemptedEntry == null ||
!isHigherPriority(taskPriority,
preemptedEntry.getValue().getPriority())) {
// keep the lower priority or the one added later
preemptedEntry = entry;
}
}
if(preemptedEntry != null) {
// found something to preempt
LOG.info("Preempting task: " + preemptedEntry.getKey() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
preemptedContainer = preemptedEntry.getValue().getId();
// app client will be notified when after container is killed
// and we get its completed container status
}
}
}
// upcall outside locks
if (preemptedContainer != null) {
appClientDelegate.preemptContainer(preemptedContainer);
}
}