protected Void callInternal()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [502:655]


    protected Void callInternal() throws Exception {
      while (!isShutdown.get() && !isAllInputFetched()) {
        lock.lock();
        try {
          LOG.info(
              "numFetchers:{}, shuffleInfoEventsMap.size:{}, numInputs:{}.",
              numFetchers,
              shuffleInfoEventsMap.size(),
              numInputs);
          while (((rssRunningFetchers.size() >= numFetchers || pendingPartition.isEmpty())
                  && !isAllInputFetched())
              || !isAllInputAdded()) {
            LOG.info(
                "isAllInputAdded:{}, rssRunningFetchers:{}, numFetchers:{}, pendingPartition:{}, "
                    + "successRssPartitionSet:{}, allRssPartition:{} ",
                isAllInputAdded(),
                rssRunningFetchers,
                numFetchers,
                pendingPartition,
                successRssPartitionSet,
                allRssPartition);

            inputContext.notifyProgress();
            boolean isSignal = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
            if (isSignal) {
              LOG.info("wakeLoop is signal");
            }
            if (isShutdown.get()) {
              LOG.info("is shut down and break");
              break;
            }
          }
          LOG.info(
              "run out of while, is all inputadded:{}, fetched:{}",
              isAllInputAdded(),
              isAllInputFetched());
        } finally {
          lock.unlock();
        }

        if (shuffleError != null) {
          LOG.warn("Shuffle error.", shuffleError);
          // InputContext has already been informed of a fatal error. Relying on
          // tez to kill the task.
          break;
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, numCompletedInputs);
        }

        if (!isAllInputFetched() && !isShutdown.get()) {
          lock.lock();
          try {
            LOG.info(
                "numFetchers:{},runningFetchers.size():{}.",
                numFetchers,
                rssRunningFetchers.size());
            int maxFetchersToRun = numFetchers - rssRunningFetchers.size();
            int count = 0;
            LOG.info("pendingPartition:{}", pendingPartition.peek());
            while (pendingPartition.peek() != null && !isShutdown.get()) {
              Integer partition = null;
              try {
                partition = pendingPartition.take();
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(
                      srcNameTrimmed
                          + ": "
                          + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }

              if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Processing pending partition: {}", srcNameTrimmed, partition);
              }

              if (!isShutdown.get()
                  && (!successRssPartitionSet.contains(partition)
                      && !runningRssPartitionMap.contains(partition))) {
                runningRssPartitionMap.add(partition);
                LOG.info(
                    "generate RssTezFetcherTask, partition:{}, rssWoker:{}, all woker:{}",
                    partition,
                    partitionToServers.get(partition),
                    partitionToServers);

                int maxAttemptNo = RssTezUtils.getMaxAttemptNo(conf);

                RssTezFetcherTask fetcher =
                    new RssTezFetcherTask(
                        RssShuffleManager.this,
                        inputContext,
                        conf,
                        inputManager,
                        partition,
                        shuffleId,
                        applicationAttemptId,
                        partitionToInput.get(partition),
                        new HashSet<ShuffleServerInfo>(partitionToServers.get(partition)),
                        rssAllBlockIdBitmapMap,
                        rssSuccessBlockIdBitmapMap,
                        numInputs,
                        partitionToServers.size(),
                        maxAttemptNo);
                rssRunningFetchers.add(fetcher);
                if (isShutdown.get()) {
                  LOG.info(
                      srcNameTrimmed
                          + ": "
                          + "hasBeenShutdown,"
                          + "Breaking out of ShuffleScheduler Loop");
                  break;
                }
                ListenableFuture<FetchResult> future =
                    fetcherExecutor.submit(fetcher); // add fetcher task
                Futures.addCallback(
                    future, new FetchFutureCallback(fetcher), MoreExecutors.directExecutor());
                if (++count >= maxFetchersToRun) {
                  break;
                }
              } else {
                if (LOG.isDebugEnabled()) {
                  LOG.debug(
                      srcNameTrimmed
                          + ": "
                          + "Skipping partition: "
                          + partition
                          + " since is shutdown");
                }
              }
            }
          } finally {
            lock.unlock();
          }
        }
      }
      LOG.info("RssShuffleManager numInputs:{}", numInputs);
      shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
      LOG.info(
          srcNameTrimmed
              + ": "
              + "Shutting down FetchScheduler, Was Interrupted: "
              + Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }