protected Void callInternal()

in client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java [176:283]


    protected Void callInternal() throws InterruptedException {
      while (!isShutdown.get() && !isAllInputFetched()) {
        synchronized (CelebornScheduler.this) {
          while (!allInputTaskAttemptDone()
              || ((celebornRunningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
                  && !isAllInputFetched())) {
            try {
              waitAndNotifyProgress();
            } catch (InterruptedException e) {
              if (isShutdown.get()) {
                LOG.info(
                    srcNameTrimmed
                        + ": "
                        + "Interrupted while waiting for fetchers to complete "
                        + "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                Thread.currentThread().interrupt();
                break;
              } else {
                throw e;
              }
            }
          }
        }

        // Ensure there's memory available before scheduling the next Fetcher.
        try {
          // If merge is on, block
          mergeManager.waitForInMemoryMerge();
          // In case usedMemory > memorylimit, wait until some memory is released
          mergeManager.waitForShuffleToMergeMemory();
        } catch (InterruptedException e) {
          if (isShutdown.get()) {
            LOG.info(
                srcNameTrimmed
                    + ": "
                    + "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
            Thread.currentThread().interrupt();
            break;
          } else {
            throw e;
          }
        }

        if (!isShutdown.get() && !isAllInputFetched()) {
          synchronized (CelebornScheduler.this) {
            int numFetchersToRun = numFetchers - celebornRunningFetchers.size();
            int count = 0;
            while (count < numFetchersToRun && !isShutdown.get() && !isAllInputFetched()) {
              MapHost mapHost;
              try {
                mapHost = getHost(); // Leads to a wait.
              } catch (InterruptedException e) {
                if (isShutdown.get()) {
                  LOG.info(
                      srcNameTrimmed
                          + ": "
                          + "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                  Thread.currentThread().interrupt();
                  break;
                } else {
                  throw e;
                }
              }
              if (mapHost == null) {
                break; // Check for the exit condition.
              }
              LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost);
              if (!isShutdown.get()) {
                count++;
                if (LOG.isDebugEnabled()) {
                  LOG.debug(
                      srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
                      mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
                }

                if (isFirstRssPartitionFetch(mapHost)) {
                  CelebornTezShuffleDataFetcher celebornTezShuffleDataFetcher =
                      constructCelebornFetcherForPartition(mapHost);

                  celebornRunningFetchers.add(celebornTezShuffleDataFetcher);
                  ListenableFuture<Void> future =
                      fetcherExecutor.submit(celebornTezShuffleDataFetcher);
                  Futures.addCallback(
                      future,
                      new FetchFutureCallback(celebornTezShuffleDataFetcher),
                      MoreExecutors.directExecutor());
                } else {
                  for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
                    remainingMaps.decrementAndGet();
                  }
                  LOG.info(
                      "Partition was fetched, remainingMaps desc, now value:{}",
                      remainingMaps.get());
                }
              }
            }
          }
        }
      }
      LOG.info(
          "Shutting down FetchScheduler for input: {}, wasInterrupted={}",
          srcNameTrimmed,
          Thread.currentThread().isInterrupted());
      if (!fetcherExecutor.isShutdown()) {
        fetcherExecutor.shutdownNow();
      }
      return null;
    }