in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java [65:113]
public Roaring64NavigableMap fetchAllRssTaskIds() {
try {
acceptMapCompletionEvents();
} catch (Exception e) {
throw new RssException(
"Reduce: " + reduce + " fails to accept completion events due to: " + e.getMessage());
}
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
for (TaskAttemptID taskAttemptID : successMaps) {
if (!obsoleteMaps.contains(taskAttemptID)) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID, appAttemptId);
int mapIndex = taskAttemptID.getTaskID().getId();
// There can be multiple successful attempts on same map task.
// So we only need to accept one of them.
if (!mapIndexBitmap.contains(mapIndex)) {
taskIdBitmap.addLong(rssTaskId);
if (mapIndex < totalMapsCount) {
mapIndexBitmap.addLong(mapIndex);
} else {
LOG.error(taskAttemptID + " has overflowed mapIndex");
throw new IllegalStateException(errMsg);
}
} else {
LOG.warn(taskAttemptID + " is redundant on index: " + mapIndex);
}
} else {
LOG.warn(taskAttemptID + " is successful but cancelled by obsolete event");
}
}
// each map should have only one success attempt
if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
throw new IllegalStateException(errMsg);
}
if (tipFailedCount != 0) {
LOG.warn("There are " + tipFailedCount + " tipFailed tasks");
}
if (taskIdBitmap.getLongCardinality() + tipFailedCount != totalMapsCount) {
for (int index = 0; index < totalMapsCount; index++) {
if (!mapIndexBitmap.contains(index)) {
LOG.error("Fail to fetch " + " map task on index: " + index);
}
}
throw new IllegalStateException(errMsg);
}
return taskIdBitmap;
}