in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [1798:1876]
private RssTezShuffleDataFetcher constructRssFetcherForPartition(
MapHost mapHost, List<ShuffleServerInfo> shuffleServerInfoList) throws RssException {
Set<ShuffleServerInfo> shuffleServerInfoSet = new HashSet<>(shuffleServerInfoList);
LOG.info("ConstructRssFetcherForPartition, shuffleServerInfoSet: {}", shuffleServerInfoSet);
Optional<InputAttemptIdentifier> attempt =
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).stream().findFirst();
LOG.info(
"ConstructRssFetcherForPartition, partitionId:{}, take a attempt:{}",
mapHost.getPartitionId(),
attempt);
ShuffleWriteClient writeClient = RssTezUtils.createShuffleClient(conf);
String clientType = "";
Roaring64NavigableMap blockIdBitmap =
writeClient.getShuffleResult(
clientType,
shuffleServerInfoSet,
applicationAttemptId.toString(),
shuffleId,
mapHost.getPartitionId());
writeClient.close();
int appAttemptId = applicationAttemptId.getAttemptId();
Roaring64NavigableMap taskIdBitmap =
RssTezUtils.fetchAllRssTaskIds(
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()),
this.numInputs,
appAttemptId);
LOG.info(
"In reduce: {}, RSS Tez client has fetched blockIds and taskIds successfully, partitionId:{}.",
inputContext.getTaskVertexName(),
mapHost.getPartitionId());
// start fetcher to fetch blocks from RSS servers
if (!taskIdBitmap.isEmpty()) {
LOG.info(
"In reduce: "
+ inputContext.getTaskVertexName()
+ ", Rss Tez client starts to fetch blocks from RSS server");
JobConf readerJobConf = getRemoteConf();
int partitionNum = partitionToServers.size();
boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() > 1;
CreateShuffleReadClientRequest request =
new CreateShuffleReadClientRequest(
applicationAttemptId.toString(),
shuffleId,
mapHost.getPartitionId(),
basePath,
partitionNumPerRange,
partitionNum,
blockIdBitmap,
taskIdBitmap,
shuffleServerInfoList,
readerJobConf,
new TezIdHelper(),
expectedTaskIdsBitmapFilterEnable,
RssTezConfig.toRssConf(conf));
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssTezShuffleDataFetcher fetcher =
new RssTezShuffleDataFetcher(
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
mapHost.getPartitionId(),
mergeManager,
inputContext.getCounters(),
shuffleReadClient,
blockIdBitmap.getLongCardinality(),
RssTezConfig.toRssConf(conf),
exceptionReporter);
return fetcher;
}
throw new RssException("Construct rss fetcher partition task failed");
}