public void run()

in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1698:1786]


    public void run() {
      while(running) {
        // Try assigning all containers if there's a request to do so.
        if (tryAssigningAll) {
          doAssignAll();
          tryAssigningAll = false;
        }

        // Try allocating containers which have timed out.
        // Required since these containers may get assigned without
        // locality at this point.
        if (delayedContainers.peek() == null) {
          try {
            // test only signaling to make TestTaskScheduler work
            if (drainedDelayedContainersForTest != null) {
              drainedDelayedContainersForTest.set(true);
              synchronized (drainedDelayedContainersForTest) {
                drainedDelayedContainersForTest.notifyAll();
              }
            }
            synchronized(this) {
              this.wait();
            }
            // Re-loop to see if tryAssignAll is set.
            continue;
          } catch (InterruptedException e) {
            LOG.info("AllocatedContainerManager Thread interrupted");
          }
        } else {
          // test only sleep to prevent tight loop cycling that makes tests stall
          if (drainedDelayedContainersForTest != null) {
            try {
              Thread.sleep(100);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
          HeldContainer delayedContainer = delayedContainers.peek();
          if (delayedContainer == null) {
            continue;
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug("Considering HeldContainer: "
              + delayedContainer + " for assignment");
          }
          long currentTs = System.currentTimeMillis();
          long nextScheduleTs = delayedContainer.getNextScheduleTime();
          if (currentTs >= nextScheduleTs) {
            // Remove the container and try scheduling it.
            // TEZ-587 what if container is released by RM after this
            // in onContainerCompleted()
            delayedContainer = delayedContainers.poll();
            if (delayedContainer == null) {
              continue;
            }
            Map<CookieContainerRequest, Container> assignedContainers = null;
            synchronized(YarnTaskSchedulerService.this) {
              if (null !=
                  heldContainers.get(delayedContainer.getContainer().getId())) {
                assignedContainers = assignDelayedContainer(delayedContainer);
              } else {
                LOG.info("Skipping delayed container as container is no longer"
                  + " running, containerId="
                  + delayedContainer.getContainer().getId());
              }
            }
            // Inform App should be done outside of the lock
            informAppAboutAssignments(assignedContainers);
          } else {
            synchronized(this) {
              try {
                // Wait for the next container to be assignable
                delayedContainer = delayedContainers.peek();
                long diff = localitySchedulingDelay;
                if (delayedContainer != null) {
                  diff = delayedContainer.getNextScheduleTime() - currentTs;
                }
                if (diff > 0) {
                  this.wait(diff);
                }
              } catch (InterruptedException e) {
                LOG.info("AllocatedContainerManager Thread interrupted");
              }
            }
          }
        }
      }
      releasePendingContainers();
    }