in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1698:1786]
public void run() {
while(running) {
// Try assigning all containers if there's a request to do so.
if (tryAssigningAll) {
doAssignAll();
tryAssigningAll = false;
}
// Try allocating containers which have timed out.
// Required since these containers may get assigned without
// locality at this point.
if (delayedContainers.peek() == null) {
try {
// test only signaling to make TestTaskScheduler work
if (drainedDelayedContainersForTest != null) {
drainedDelayedContainersForTest.set(true);
synchronized (drainedDelayedContainersForTest) {
drainedDelayedContainersForTest.notifyAll();
}
}
synchronized(this) {
this.wait();
}
// Re-loop to see if tryAssignAll is set.
continue;
} catch (InterruptedException e) {
LOG.info("AllocatedContainerManager Thread interrupted");
}
} else {
// test only sleep to prevent tight loop cycling that makes tests stall
if (drainedDelayedContainersForTest != null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
HeldContainer delayedContainer = delayedContainers.peek();
if (delayedContainer == null) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Considering HeldContainer: "
+ delayedContainer + " for assignment");
}
long currentTs = System.currentTimeMillis();
long nextScheduleTs = delayedContainer.getNextScheduleTime();
if (currentTs >= nextScheduleTs) {
// Remove the container and try scheduling it.
// TEZ-587 what if container is released by RM after this
// in onContainerCompleted()
delayedContainer = delayedContainers.poll();
if (delayedContainer == null) {
continue;
}
Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized(YarnTaskSchedulerService.this) {
if (null !=
heldContainers.get(delayedContainer.getContainer().getId())) {
assignedContainers = assignDelayedContainer(delayedContainer);
} else {
LOG.info("Skipping delayed container as container is no longer"
+ " running, containerId="
+ delayedContainer.getContainer().getId());
}
}
// Inform App should be done outside of the lock
informAppAboutAssignments(assignedContainers);
} else {
synchronized(this) {
try {
// Wait for the next container to be assignable
delayedContainer = delayedContainers.peek();
long diff = localitySchedulingDelay;
if (delayedContainer != null) {
diff = delayedContainer.getNextScheduleTime() - currentTs;
}
if (diff > 0) {
this.wait(diff);
}
} catch (InterruptedException e) {
LOG.info("AllocatedContainerManager Thread interrupted");
}
}
}
}
}
releasePendingContainers();
}