protected Void callInternal()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [1669:1789]


    protected Void callInternal()
        throws IOException, InterruptedException, TezException, RssException {
      while (!isShutdown.get() && !isAllInputFetched()) {
        LOG.info("Now allInputTaskAttemptDone: " + allInputTaskAttemptDone());

        synchronized (RssShuffleScheduler.this) {
          while (!allInputTaskAttemptDone()
              || ((rssRunningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
                  && !isAllInputFetched())) {
            try {
              LOG.info(
                  "RssShuffleSchedulerCallable, wait pending hosts, pendingHosts:{}.",
                  pendingHosts.isEmpty());
              waitAndNotifyProgress();
            } catch (InterruptedException e) {
              if (isShutdown.get()) {
                LOG.info(
                    srcNameTrimmed
                        + ": "
                        + "Interrupted while waiting for fetchers to complete"
                        + "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                Thread.currentThread().interrupt();
                break;
              } else {
                throw e;
              }
            }
          }
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, numInputs - remainingMaps.get());
        }
        // Ensure there's memory available before scheduling the next Fetcher.
        try {
          // If merge is on, block
          mergeManager.waitForInMemoryMerge();
          // In case usedMemory > memorylimit, wait until some memory is released
          mergeManager.waitForShuffleToMergeMemory();
        } catch (InterruptedException e) {
          if (isShutdown.get()) {
            LOG.info(
                srcNameTrimmed
                    + ": Interrupted while waiting for merge to complete and hasBeenShutdown. "
                    + "Breaking out of ShuffleSchedulerCallable loop");
            Thread.currentThread().interrupt();
            break;
          } else {
            throw e;
          }
        }

        if (!isShutdown.get() && !isAllInputFetched()) {
          synchronized (RssShuffleScheduler.this) {
            int numFetchersToRun = numFetchers - rssRunningFetchers.size();
            int count = 0;
            while (count < numFetchersToRun && !isShutdown.get() && !isAllInputFetched()) {
              MapHost mapHost;
              try {
                mapHost = getHost(); // Leads to a wait.
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(
                      srcNameTrimmed
                          + ": Interrupted while waiting for host and hasBeenShutdown. "
                          + "Breaking out of ShuffleSchedulerCallable loop");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }
              if (mapHost == null) {
                LOG.info("Get null mapHost and break out.");
                break; // Check for the exit condition.
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost.toString());
              }
              if (!isShutdown.get()) {
                count++;
                if (LOG.isDebugEnabled()) {
                  LOG.debug(
                      "{}: Scheduling fetch for inputHost: {}",
                      srcNameTrimmed,
                      mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
                }

                if (isFirstRssPartitionFetch(mapHost)) {
                  int partitionId = mapHost.getPartitionId();
                  RssTezShuffleDataFetcher rssTezShuffleDataFetcher =
                      constructRssFetcherForPartition(mapHost, partitionToServers.get(partitionId));

                  rssRunningFetchers.add(rssTezShuffleDataFetcher);
                  ListenableFuture<Void> future = fetcherExecutor.submit(rssTezShuffleDataFetcher);
                  Futures.addCallback(
                      future,
                      new FetchFutureCallback(rssTezShuffleDataFetcher),
                      MoreExecutors.directExecutor());
                } else {
                  for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
                    remainingMaps.decrementAndGet();
                  }
                  LOG.info(
                      "Partition was fetched, remainingMaps desc, now value:{}",
                      remainingMaps.get());
                }
              }
            }
          }
        }
      }
      LOG.info(
          "Shutting down FetchScheduler for input: {}, wasInterrupted={}",
          srcNameTrimmed,
          Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }