private RssTezShuffleDataFetcher constructRssFetcherForPartition()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [1798:1876]


  private RssTezShuffleDataFetcher constructRssFetcherForPartition(
      MapHost mapHost, List<ShuffleServerInfo> shuffleServerInfoList) throws RssException {
    Set<ShuffleServerInfo> shuffleServerInfoSet = new HashSet<>(shuffleServerInfoList);
    LOG.info("ConstructRssFetcherForPartition, shuffleServerInfoSet: {}", shuffleServerInfoSet);

    Optional<InputAttemptIdentifier> attempt =
        partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).stream().findFirst();
    LOG.info(
        "ConstructRssFetcherForPartition, partitionId:{}, take a attempt:{}",
        mapHost.getPartitionId(),
        attempt);

    ShuffleWriteClient writeClient = RssTezUtils.createShuffleClient(conf);
    String clientType = "";
    Roaring64NavigableMap blockIdBitmap =
        writeClient.getShuffleResult(
            clientType,
            shuffleServerInfoSet,
            applicationAttemptId.toString(),
            shuffleId,
            mapHost.getPartitionId());
    writeClient.close();

    int appAttemptId = applicationAttemptId.getAttemptId();
    Roaring64NavigableMap taskIdBitmap =
        RssTezUtils.fetchAllRssTaskIds(
            partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()),
            this.numInputs,
            appAttemptId);

    LOG.info(
        "In reduce: {}, RSS Tez client has fetched blockIds and taskIds successfully, partitionId:{}.",
        inputContext.getTaskVertexName(),
        mapHost.getPartitionId());

    // start fetcher to fetch blocks from RSS servers
    if (!taskIdBitmap.isEmpty()) {
      LOG.info(
          "In reduce: "
              + inputContext.getTaskVertexName()
              + ", Rss Tez client starts to fetch blocks from RSS server");
      JobConf readerJobConf = getRemoteConf();

      int partitionNum = partitionToServers.size();
      boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() > 1;

      CreateShuffleReadClientRequest request =
          new CreateShuffleReadClientRequest(
              applicationAttemptId.toString(),
              shuffleId,
              mapHost.getPartitionId(),
              basePath,
              partitionNumPerRange,
              partitionNum,
              blockIdBitmap,
              taskIdBitmap,
              shuffleServerInfoList,
              readerJobConf,
              new TezIdHelper(),
              expectedTaskIdsBitmapFilterEnable,
              RssTezConfig.toRssConf(conf));

      ShuffleReadClient shuffleReadClient =
          ShuffleClientFactory.getInstance().createShuffleReadClient(request);
      RssTezShuffleDataFetcher fetcher =
          new RssTezShuffleDataFetcher(
              partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
              mapHost.getPartitionId(),
              mergeManager,
              inputContext.getCounters(),
              shuffleReadClient,
              blockIdBitmap.getLongCardinality(),
              RssTezConfig.toRssConf(conf),
              exceptionReporter);
      return fetcher;
    }

    throw new RssException("Construct rss fetcher partition task failed");
  }