protected Void callInternal()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java [1389:1473]


    protected Void callInternal() throws InterruptedException {
      while (!isShutdown.get() && remainingMaps.get() > 0) {
        synchronized (ShuffleScheduler.this) {
          while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
              && remainingMaps.get() > 0) {
            try {
              waitAndNotifyProgress();
            } catch (InterruptedException e) {
              if (isShutdown.get()) {
                LOG.info(srcNameTrimmed + ": " +
                    "Interrupted while waiting for fetchers to complete " +
                    "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                Thread.currentThread().interrupt();
                break;
              } else {
                throw e;
              }
            }
          }
        }

        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, (numInputs - remainingMaps.get()));
        }

        // Ensure there's memory available before scheduling the next Fetcher.
        try {
          // If merge is on, block
          mergeManager.waitForInMemoryMerge();
          // In case usedMemory > memorylimit, wait until some memory is released
          mergeManager.waitForShuffleToMergeMemory();
        } catch (InterruptedException e) {
          if (isShutdown.get()) {
            LOG.info(srcNameTrimmed + ": " +
                "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
            Thread.currentThread().interrupt();
            break;
          } else {
            throw e;
          }
        }

        if (!isShutdown.get() && remainingMaps.get() > 0) {
          synchronized (ShuffleScheduler.this) {
            int numFetchersToRun = numFetchers - runningFetchers.size();
            int count = 0;
            while (count < numFetchersToRun && !isShutdown.get() && remainingMaps.get() > 0) {
              MapHost mapHost;
              try {
                mapHost = getHost();  // Leads to a wait.
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(srcNameTrimmed + ": " +
                      "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }
              if (mapHost == null) {
                break; // Check for the exit condition.
              }
              LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost);
              if (!isShutdown.get()) {
                count++;
                if (LOG.isDebugEnabled()) {
                  LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
                      mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
                }
                FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
                runningFetchers.add(fetcherOrderedGrouped);
                ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped);
                Futures.addCallback(future, new FetchFutureCallback(fetcherOrderedGrouped), GuavaShim.directExecutor());
              }
            }
          }
        }
      }
      LOG.info("Shutting down FetchScheduler for input: {}, wasInterrupted={}", srcNameTrimmed, Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }