in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java [198:286]
public RawKeyValueIterator run() throws IOException, InterruptedException {
// get assigned RSS servers
Set<ShuffleServerInfo> serverInfoSet =
RssMRUtils.getAssignedServers(rssJobConf, reduceId.getTaskID().getId());
List<ShuffleServerInfo> serverInfoList = new ArrayList<>();
for (ShuffleServerInfo server : serverInfoSet) {
serverInfoList.add(server);
}
// just get blockIds from RSS servers
ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf);
Roaring64NavigableMap blockIdBitmap =
writeClient.getShuffleResult(
clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
writeClient.close();
// get map-completion events to generate RSS taskIDs
final RssEventFetcher<K, V> eventFetcher =
new RssEventFetcher<K, V>(
appAttemptId, reduceId, umbilical, mrJobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds();
LOG.info(
"In reduce: " + reduceId + ", RSS MR client has fetched blockIds and taskIds successfully");
// start fetcher to fetch blocks from RSS servers
if (!taskIdBitmap.isEmpty()) {
LOG.info("In reduce: " + reduceId + ", Rss MR client starts to fetch blocks from RSS server");
JobConf readerJobConf = getRemoteConf();
boolean expectedTaskIdsBitmapFilterEnable = serverInfoList.size() > 1;
CreateShuffleReadClientRequest request =
new CreateShuffleReadClientRequest(
appId,
0,
reduceId.getTaskID().getId(),
basePath,
partitionNumPerRange,
partitionNum,
blockIdBitmap,
taskIdBitmap,
serverInfoList,
readerJobConf,
new MRIdHelper(),
expectedTaskIdsBitmapFilterEnable,
RssMRConfig.toRssConf(rssJobConf));
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher =
new RssFetcher(
mrJobConf,
reduceId,
taskStatus,
merger,
copyPhase,
reporter,
metrics,
shuffleReadClient,
blockIdBitmap.getLongCardinality(),
RssMRConfig.toRssConf(rssJobConf));
fetcher.fetchAllRssBlocks();
LOG.info(
"In reduce: " + reduceId + ", Rss MR client fetches blocks from RSS server successfully");
}
copyPhase.complete();
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new Shuffle.ShuffleError("Error while doing final merge ", e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new Shuffle.ShuffleError("error in shuffle in " + throwingThreadName, throwable);
}
}
LOG.info(
"In reduce: " + reduceId + ", Rss MR client returns sorted data to reduce successfully");
return kvIter;
}