public Void call()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java [211:284]


    public Void call() throws Exception {
      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
        lock.lock();
        try {
          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
            if (numCompletedInputs.get() < numInputs) {
              wakeLoop.await();
            }
          }
        } finally {
          lock.unlock();
        }

        if (shuffleError != null) {
          // InputContext has already been informed of a fatal error. Relying on
          // tez to kill the task.
          break;
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
        }
        if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
          lock.lock();
          try {
            int maxFetchersToRun = numFetchers - runningFetchers.size();
            int count = 0;
            while (pendingHosts.peek() != null && !isShutdown.get()) {
              InputHost inputHost = null;
              try {
                inputHost = pendingHosts.take();
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                  break;
                } else {
                  throw e;
                }
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
              }
              if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
                LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
                Fetcher fetcher = constructFetcherForHost(inputHost);
                runningFetchers.add(fetcher);
                if (isShutdown.get()) {
                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                }
                ListenableFuture<FetchResult> future = fetcherExecutor
                    .submit(fetcher);
                Futures.addCallback(future, new FetchFutureCallback(fetcher));
                if (++count >= maxFetchersToRun) {
                  break;
                }
              } else {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Skipping host: " + inputHost.getIdentifier()
                      + " since it has no inputs to process");
                }
              }
            }
          } finally {
            lock.unlock();
          }
        }
      }
      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
      // TODO NEWTEZ Maybe clean up inputs.
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }