public ShuffleReader getReaderImpl()

in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [348:436]


  public <K, C> ShuffleReader<K, C> getReaderImpl(
      ShuffleHandle handle,
      int startMapIndex,
      int endMapIndex,
      int startPartition,
      int endPartition,
      TaskContext context,
      ShuffleReadMetricsReporter metrics,
      Roaring64NavigableMap taskIdBitmap) {
    if (!(handle instanceof RssShuffleHandle)) {
      throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
    }
    RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>) handle;
    final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
    int shuffleId = rssShuffleHandle.getShuffleId();
    ShuffleHandleInfo shuffleHandleInfo;
    if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
      // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
      shuffleHandleInfo =
          getRemoteShuffleHandleInfoWithStageRetry(
              context.stageId(), context.stageAttemptNumber(), shuffleId, false);
    } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
      // In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
      shuffleHandleInfo =
          getRemoteShuffleHandleInfoWithBlockRetry(
              context.stageId(), context.stageAttemptNumber(), shuffleId, false);
    } else {
      shuffleHandleInfo =
          new SimpleShuffleHandleInfo(
              shuffleId,
              rssShuffleHandle.getPartitionToServers(),
              rssShuffleHandle.getRemoteStorage());
    }
    Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
        getPartitionDataServers(shuffleHandleInfo, startPartition, endPartition);
    long start = System.currentTimeMillis();
    Roaring64NavigableMap blockIdBitmap =
        getShuffleResultForMultiPart(
            clientType,
            serverToPartitions,
            rssShuffleHandle.getAppId(),
            shuffleId,
            context.stageAttemptNumber(),
            shuffleHandleInfo.createPartitionReplicaTracking());
    LOG.info(
        "Get shuffle blockId cost "
            + (System.currentTimeMillis() - start)
            + " ms, and get "
            + blockIdBitmap.getLongCardinality()
            + " blockIds for shuffleId["
            + shuffleId
            + "], startPartition["
            + startPartition
            + "], endPartition["
            + endPartition
            + "]");

    ShuffleReadMetrics readMetrics;
    if (metrics != null) {
      readMetrics = new ReadMetrics(metrics);
    } else {
      readMetrics = context.taskMetrics().shuffleReadMetrics();
    }

    final RemoteStorageInfo shuffleRemoteStorageInfo = rssShuffleHandle.getRemoteStorage();
    LOG.info("Shuffle reader using remote storage {}", shuffleRemoteStorageInfo);
    final String shuffleRemoteStoragePath = shuffleRemoteStorageInfo.getPath();
    Configuration readerHadoopConf =
        RssSparkShuffleUtils.getRemoteStorageHadoopConf(sparkConf, shuffleRemoteStorageInfo);

    return new RssShuffleReader<K, C>(
        startPartition,
        endPartition,
        startMapIndex,
        endMapIndex,
        context,
        rssShuffleHandle,
        shuffleRemoteStoragePath,
        readerHadoopConf,
        partitionNum,
        RssUtils.generatePartitionToBitmap(
            blockIdBitmap, startPartition, endPartition, blockIdLayout),
        taskIdBitmap,
        readMetrics,
        managerClientSupplier,
        RssSparkConfig.toRssConf(sparkConf),
        dataDistributionType,
        shuffleHandleInfo.getAllPartitionServersForReader());
  }