protected Void callInternal()

in client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/CelebornShuffleManager.java [182:279]


    protected Void callInternal() throws Exception {
      while (!isShutdown.get() && !isAllInputFetched()) {
        lock.lock();
        try {
          while (((celebornRunningFetchers.size() >= numFetchers || pendingPartition.isEmpty())
                  && !isAllInputFetched())
              || !isAllInputAdded()) {
            inputContext.notifyProgress();
            boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
            if (isShutdown.get()) {
              break;
            }
          }
        } finally {
          lock.unlock();
        }

        if (shuffleError != null) {
          // InputContext has already been informed of a fatal error. Relying on
          // tez to kill the task.
          break;
        }

        LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs);
        if (!isAllInputFetched() && !isShutdown.get()) {
          lock.lock();
          try {
            int maxFetchersToRun = numFetchers - celebornRunningFetchers.size();
            int count = 0;
            while (pendingPartition.peek() != null && !isShutdown.get()) {
              Integer partition = null;
              try {
                partition = pendingPartition.take();
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(
                      sourceDestNameTrimmed
                          + ": "
                          + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug(
                    sourceDestNameTrimmed + ": " + "Processing pending partition: " + partition);
              }
              if (!isShutdown.get()
                  && (!successRssPartitionSet.contains(partition)
                      && !runningRssPartitionMap.contains(partition))) {

                runningRssPartitionMap.add(partition);

                CelebornFetcher fetcher = constructFetcherForCeleborn(partition);
                celebornRunningFetchers.add(fetcher);
                if (isShutdown.get()) {
                  LOG.info(
                      sourceDestNameTrimmed
                          + ": "
                          + "hasBeenShutdown,"
                          + "Breaking out of ShuffleScheduler Loop");
                  break;
                }
                ListenableFuture<FetchResult> future = fetcherExecutor.submit(fetcher);
                Futures.addCallback(
                    future, new FetchFutureCallback(fetcher), GuavaShim.directExecutor());
                if (++count >= maxFetchersToRun) {
                  break;
                }
              } else {
                if (LOG.isDebugEnabled()) {
                  LOG.debug(
                      sourceDestNameTrimmed
                          + ": "
                          + "Skipping partition: "
                          + partition
                          + " since is shutdown");
                }
              }
            }
          } finally {
            lock.unlock();
          }
        }
      }
      shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
      LOG.info(
          sourceDestNameTrimmed
              + ": "
              + "Shutting down FetchScheduler, Was Interrupted: "
              + Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }