public void close()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [589:654]


  public void close() {
    try {
      if (!isShutdown.getAndSet(true)) {
        try {
          logProgress();
        } catch (Exception e) {
          LOG.warn(
              "Failed log progress while closing, ignoring and continuing shutdown. Message={}",
              e.getMessage());
        }

        // Notify and interrupt the waiting scheduler thread
        synchronized (this) {
          notifyAll();
        }
        // Interrupt the ShuffleScheduler thread only if the close is invoked by another thread.
        // If this is invoked on the same thread, then the shuffleRunner has already complete, and
        // there's
        // no point interrupting it.
        // The interrupt is needed to unblock any merges or waits which may be happening, so that
        // the thread can
        // exit.
        if (shuffleSchedulerThread != null
            && !Thread.currentThread().equals(shuffleSchedulerThread)) {
          shuffleSchedulerThread.interrupt();
        }

        // Interrupt the fetchers.
        for (RssTezShuffleDataFetcher fetcher : rssRunningFetchers) {
          try {
            fetcher.shutDown();
          } catch (Exception e) {
            LOG.warn(
                "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}",
                e.getMessage());
          }
        }

        // Kill the Referee thread.
        try {
          referee.interrupt();
          referee.join();
        } catch (InterruptedException e) {
          LOG.warn("Interrupted while shutting down referee. Ignoring and continuing shutdown");
          Thread.currentThread().interrupt();
        } catch (Exception e) {
          LOG.warn(
              "Error while shutting down referee. Ignoring and continuing shutdown. Message={}",
              e.getMessage());
        }
      }
    } finally {
      long startTime = System.currentTimeMillis();
      if (!fetcherExecutor.isShutdown()) {
        // Ensure that fetchers respond to cancel request.
        fetcherExecutor.shutdownNow();
      }
      long endTime = System.currentTimeMillis();
      LOG.info(
          "Shutting down fetchers for input: {}, shutdown timetaken: {} ms, "
              + "hasFetcherExecutorStopped: {}",
          srcNameTrimmed,
          (endTime - startTime),
          hasFetcherExecutorStopped());
    }
  }