protected Void callInternal()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java [428:508]


    protected Void callInternal() throws Exception {
      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
        lock.lock();
        try {
          while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
              && numCompletedInputs.get() < numInputs) {
            inputContext.notifyProgress();
            boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
            if (isShutdown.get()) {
              break;
            }
          }
        } finally {
          lock.unlock();
        }

        if (shuffleError != null) {
          // InputContext has already been informed of a fatal error. Relying on
          // tez to kill the task.
          break;
        }

        LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs);
        if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
          lock.lock();
          try {
            int maxFetchersToRun = numFetchers - runningFetchers.size();
            int count = 0;
            while (pendingHosts.peek() != null && !isShutdown.get()) {
              InputHost inputHost = null;
              try {
                inputHost = pendingHosts.take();
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(sourceDestNameTrimmed + ": " +
                    "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug(sourceDestNameTrimmed + ": " + "Processing pending host: " +
                    inputHost.toDetailedString());
              }
              if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get()) {
                Fetcher fetcher = constructFetcherForHost(inputHost, conf);
                runningFetchers.add(fetcher);
                if (isShutdown.get()) {
                  LOG.info(sourceDestNameTrimmed + ": " + "hasBeenShutdown," +
                      "Breaking out of ShuffleScheduler Loop");
                  break;
                }
                ListenableFuture<FetchResult> future = fetcherExecutor
                    .submit(fetcher);
                Futures.addCallback(future, new FetchFutureCallback(fetcher), GuavaShim.directExecutor());
                if (++count >= maxFetchersToRun) {
                  break;
                }
              } else {
                if (LOG.isDebugEnabled()) {
                  LOG.debug(sourceDestNameTrimmed + ": " + "Skipping host: " +
                      inputHost.getIdentifier() +
                      " since it has no inputs to process");
                }
              }
            }
          } finally {
            lock.unlock();
          }
        }
      }
      shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
      LOG.info(sourceDestNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " +
        Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }