in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [798:879]
public void addKnownInput(
String hostName,
int port,
CompositeInputAttemptIdentifier srcAttemptIdentifier,
int srcPhysicalIndex) {
HostPort identifier = new HostPort(hostName, port);
InputHost host = knownSrcHosts.get(identifier);
if (host == null) {
host = new InputHost(identifier);
InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
if (old != null) {
host = old;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host);
}
if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
return;
}
int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) {
if (shuffleInfoEventsMap.get(inputIdentifier + i) == null) {
shuffleInfoEventsMap.put(
inputIdentifier + i, new ShuffleEventInfo(srcAttemptIdentifier.expand(i)));
LOG.info(
"AddKnownInput, srcAttemptIdentifier:{}, i:{}, expand:{}, map:{}",
srcAttemptIdentifier,
i,
srcAttemptIdentifier.expand(i),
shuffleInfoEventsMap);
}
}
host.addKnownInput(
srcPhysicalIndex, srcAttemptIdentifier.getInputIdentifierCount(), srcAttemptIdentifier);
lock.lock();
try {
boolean added = pendingHosts.offer(host);
if (!added) {
String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
LOG.error(errorMessage);
throw new TezUncheckedException(errorMessage);
}
wakeLoop.signal();
} finally {
lock.unlock();
}
LOG.info(
"AddKnowInput, hostname:{}, port:{}, srcAttemptIdentifier:{}, srcPhysicalIndex:{}",
hostName,
port,
srcAttemptIdentifier,
srcPhysicalIndex);
lock.lock();
try {
for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) {
int p = srcPhysicalIndex + i;
LOG.info(
"PartitionToInput, original:{}, add:{}, now:{}",
srcAttemptIdentifier,
srcAttemptIdentifier.expand(i),
partitionToInput.get(p));
if (!allRssPartition.contains(srcPhysicalIndex + i)) {
pendingPartition.add(p);
}
allRssPartition.add(p);
partitionToInput.putIfAbsent(p, new ArrayList<>());
partitionToInput.get(p).add(srcAttemptIdentifier);
LOG.info("Add partition:{}, after add, now partition:{}", p, allRssPartition);
}
numWithDataInput.incrementAndGet();
LOG.info("numWithDataInput:{}.", numWithDataInput.get());
} finally {
lock.unlock();
}
}