in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [560:632]
public <K, C> ShuffleReader<K, C> getReaderImpl(
ShuffleHandle handle,
int startMapIndex,
int endMapIndex,
int startPartition,
int endPartition,
TaskContext context,
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap) {
if (!(handle instanceof RssShuffleHandle)) {
throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>) handle;
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
Map<Integer, List<ShuffleServerInfo>> allPartitionToServers =
rssShuffleHandle.getPartitionToServers();
Map<Integer, List<ShuffleServerInfo>> requirePartitionToServers =
allPartitionToServers.entrySet().stream()
.filter(x -> x.getKey() >= startPartition && x.getKey() < endPartition)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
RssUtils.generateServerToPartitions(requirePartitionToServers);
long start = System.currentTimeMillis();
Roaring64NavigableMap blockIdBitmap =
getShuffleResultForMultiPart(
clientType,
serverToPartitions,
rssShuffleHandle.getAppId(),
shuffleId,
context.stageAttemptNumber());
LOG.info(
"Get shuffle blockId cost "
+ (System.currentTimeMillis() - start)
+ " ms, and get "
+ blockIdBitmap.getLongCardinality()
+ " blockIds for shuffleId["
+ shuffleId
+ "], startPartition["
+ start
+ "], endPartition["
+ endPartition
+ "]");
ShuffleReadMetrics readMetrics;
if (metrics != null) {
readMetrics = new ReadMetrics(metrics);
} else {
readMetrics = context.taskMetrics().shuffleReadMetrics();
}
final RemoteStorageInfo shuffleRemoteStorageInfo = rssShuffleHandle.getRemoteStorage();
LOG.info("Shuffle reader using remote storage {}", shuffleRemoteStorageInfo);
final String shuffleRemoteStoragePath = shuffleRemoteStorageInfo.getPath();
Configuration readerHadoopConf =
RssSparkShuffleUtils.getRemoteStorageHadoopConf(sparkConf, shuffleRemoteStorageInfo);
return new RssShuffleReader<K, C>(
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
rssShuffleHandle,
shuffleRemoteStoragePath,
readerHadoopConf,
partitionNum,
RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition),
taskIdBitmap,
readMetrics,
RssSparkConfig.toRssConf(sparkConf),
dataDistributionType);
}