public RawKeyValueIterator run()

in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java [198:286]


  public RawKeyValueIterator run() throws IOException, InterruptedException {

    // get assigned RSS servers
    Set<ShuffleServerInfo> serverInfoSet =
        RssMRUtils.getAssignedServers(rssJobConf, reduceId.getTaskID().getId());
    List<ShuffleServerInfo> serverInfoList = new ArrayList<>();
    for (ShuffleServerInfo server : serverInfoSet) {
      serverInfoList.add(server);
    }

    // just get blockIds from RSS servers
    ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf);
    Roaring64NavigableMap blockIdBitmap =
        writeClient.getShuffleResult(
            clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
    writeClient.close();

    // get map-completion events to generate RSS taskIDs
    final RssEventFetcher<K, V> eventFetcher =
        new RssEventFetcher<K, V>(
            appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH);
    Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds();

    LOG.info(
        "In reduce: " + reduceId + ", RSS MR client has fetched blockIds and taskIds successfully");

    // start fetcher to fetch blocks from RSS servers
    if (!taskIdBitmap.isEmpty()) {
      LOG.info("In reduce: " + reduceId + ", Rss MR client starts to fetch blocks from RSS server");
      JobConf readerJobConf = getRemoteConf();
      boolean expectedTaskIdsBitmapFilterEnable = serverInfoList.size() > 1;
      CreateShuffleReadClientRequest request =
          new CreateShuffleReadClientRequest(
              appId,
              0,
              reduceId.getTaskID().getId(),
              basePath,
              partitionNumPerRange,
              partitionNum,
              blockIdBitmap,
              taskIdBitmap,
              serverInfoList,
              readerJobConf,
              new MRIdHelper(),
              expectedTaskIdsBitmapFilterEnable,
              RssMRConfig.toRssConf(rssJobConf));
      ShuffleReadClient shuffleReadClient =
          ShuffleClientFactory.getInstance().createShuffleReadClient(request);
      RssFetcher fetcher =
          new RssFetcher(
              mrJobConf,
              reduceId,
              taskStatus,
              merger,
              copyPhase,
              reporter,
              metrics,
              shuffleReadClient,
              blockIdBitmap.getLongCardinality(),
              RssMRConfig.toRssConf(rssJobConf));
      fetcher.fetchAllRssBlocks();
      LOG.info(
          "In reduce: " + reduceId + ", Rss MR client fetches blocks from RSS server successfully");
    }

    copyPhase.complete();
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);

    // Finish the on-going merges...
    RawKeyValueIterator kvIter = null;
    try {
      kvIter = merger.close();
    } catch (Throwable e) {
      throw new Shuffle.ShuffleError("Error while doing final merge ", e);
    }

    // Sanity check
    synchronized (this) {
      if (throwable != null) {
        throw new Shuffle.ShuffleError("error in shuffle in " + throwingThreadName, throwable);
      }
    }

    LOG.info(
        "In reduce: " + reduceId + ", Rss MR client returns sorted data to reduce successfully");

    return kvIter;
  }