in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [348:436]
public <K, C> ShuffleReader<K, C> getReaderImpl(
ShuffleHandle handle,
int startMapIndex,
int endMapIndex,
int startPartition,
int endPartition,
TaskContext context,
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap) {
if (!(handle instanceof RssShuffleHandle)) {
throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>) handle;
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
ShuffleHandleInfo shuffleHandleInfo;
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithStageRetry(
context.stageId(), context.stageAttemptNumber(), shuffleId, false);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithBlockRetry(
context.stageId(), context.stageAttemptNumber(), shuffleId, false);
} else {
shuffleHandleInfo =
new SimpleShuffleHandleInfo(
shuffleId,
rssShuffleHandle.getPartitionToServers(),
rssShuffleHandle.getRemoteStorage());
}
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
getPartitionDataServers(shuffleHandleInfo, startPartition, endPartition);
long start = System.currentTimeMillis();
Roaring64NavigableMap blockIdBitmap =
getShuffleResultForMultiPart(
clientType,
serverToPartitions,
rssShuffleHandle.getAppId(),
shuffleId,
context.stageAttemptNumber(),
shuffleHandleInfo.createPartitionReplicaTracking());
LOG.info(
"Get shuffle blockId cost "
+ (System.currentTimeMillis() - start)
+ " ms, and get "
+ blockIdBitmap.getLongCardinality()
+ " blockIds for shuffleId["
+ shuffleId
+ "], startPartition["
+ startPartition
+ "], endPartition["
+ endPartition
+ "]");
ShuffleReadMetrics readMetrics;
if (metrics != null) {
readMetrics = new ReadMetrics(metrics);
} else {
readMetrics = context.taskMetrics().shuffleReadMetrics();
}
final RemoteStorageInfo shuffleRemoteStorageInfo = rssShuffleHandle.getRemoteStorage();
LOG.info("Shuffle reader using remote storage {}", shuffleRemoteStorageInfo);
final String shuffleRemoteStoragePath = shuffleRemoteStorageInfo.getPath();
Configuration readerHadoopConf =
RssSparkShuffleUtils.getRemoteStorageHadoopConf(sparkConf, shuffleRemoteStorageInfo);
return new RssShuffleReader<K, C>(
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
rssShuffleHandle,
shuffleRemoteStoragePath,
readerHadoopConf,
partitionNum,
RssUtils.generatePartitionToBitmap(
blockIdBitmap, startPartition, endPartition, blockIdLayout),
taskIdBitmap,
readMetrics,
managerClientSupplier,
RssSparkConfig.toRssConf(sparkConf),
dataDistributionType,
shuffleHandleInfo.getAllPartitionServersForReader());
}