in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [1420:1517]
public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
ListMultimap<Integer, InputAttemptIdentifier> dedupedList = LinkedListMultimap.create();
Iterator<InputAttemptIdentifier> listItr = origList.iterator();
while (listItr.hasNext()) {
// we may want to try all versions of the input but with current retry
// behavior older ones are likely to be lost and should be ignored.
// This may be removed after TEZ-914
InputAttemptIdentifier id = listItr.next();
if (inputShouldBeConsumed(id)) {
Integer inputNumber = Integer.valueOf(id.getInputIdentifier());
List<InputAttemptIdentifier> oldIdList = dedupedList.get(inputNumber);
if (oldIdList == null || oldIdList.isEmpty()) {
dedupedList.put(inputNumber, id);
continue;
}
// In case of pipelined shuffle, we can have multiple spills. In such cases, we can have
// more than one item in the oldIdList.
boolean addIdentifierToList = false;
Iterator<InputAttemptIdentifier> oldIdIterator = oldIdList.iterator();
while (oldIdIterator.hasNext()) {
InputAttemptIdentifier oldId = oldIdIterator.next();
// no need to add if spill ids are same
if (id.canRetrieveInputInChunks()) {
if (oldId.getSpillEventId() == id.getSpillEventId()) {
// need to handle deterministic spills later.
addIdentifierToList = false;
continue;
} else if (oldId.getAttemptNumber() == id.getAttemptNumber()) {
// but with different spill id.
addIdentifierToList = true;
break;
}
}
// if its from different attempt, take the latest attempt
if (oldId.getAttemptNumber() < id.getAttemptNumber()) {
// remove existing identifier
oldIdIterator.remove();
LOG.warn(
"Old Src for InputIndex: "
+ inputNumber
+ " with attemptNumber: "
+ oldId.getAttemptNumber()
+ " was not determined to be invalid. Ignoring it for now in favour of "
+ id.getAttemptNumber());
addIdentifierToList = true;
break;
}
}
if (addIdentifierToList) {
dedupedList.put(inputNumber, id);
}
} else {
LOG.info("Ignoring finished or obsolete source: " + id);
}
}
// Compute the final list, limited by NUM_FETCHERS_AT_ONCE
List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
int includedMaps = 0;
int totalSize = dedupedList.size();
for (Integer inputIndex : dedupedList.keySet()) {
List<InputAttemptIdentifier> attemptIdentifiers = dedupedList.get(inputIndex);
for (InputAttemptIdentifier inputAttemptIdentifier : attemptIdentifiers) {
if (includedMaps++ >= maxTaskOutputAtOnce) {
host.addKnownMap(inputAttemptIdentifier);
} else {
if (inputAttemptIdentifier.canRetrieveInputInChunks()) {
ShuffleEventInfo shuffleEventInfo =
pipelinedShuffleInfoEventsMap.get(inputAttemptIdentifier.getInputIdentifier());
if (shuffleEventInfo != null) {
shuffleEventInfo.scheduledForDownload = true;
}
}
result.add(inputAttemptIdentifier);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"assigned "
+ includedMaps
+ " of "
+ totalSize
+ " to "
+ host
+ " to "
+ Thread.currentThread().getName());
}
return result;
}