in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [744:870]
public synchronized void copySucceeded(
InputAttemptIdentifier srcAttemptIdentifier,
MapHost host,
long bytesCompressed,
long bytesDecompressed,
long millis,
MapOutput output,
boolean isLocalFetch)
throws IOException {
inputContext.notifyProgress();
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
if (!isLocalFetch) {
/** Reset it only when it is a non-local-disk copy. */
failedShufflesSinceLastCompletion = 0;
}
if (output != null) {
failureCounts.remove(srcAttemptIdentifier);
if (host != null) {
hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
}
output.commit();
fetchStatsLogger.logIndividualFetchComplete(
millis,
bytesCompressed,
bytesDecompressed,
output.getType().toString(),
srcAttemptIdentifier);
if (output.getType() == Type.DISK) {
bytesShuffledToDisk.increment(bytesCompressed);
} else if (output.getType() == Type.DISK_DIRECT) {
bytesShuffledToDiskDirect.increment(bytesCompressed);
} else {
bytesShuffledToMem.increment(bytesCompressed);
}
shuffledInputsCounter.increment(1);
} else {
// Output null implies that a physical input completion is being
// registered without needing to fetch data
skippedInputCounter.increment(1);
}
// In case of pipelined shuffle, it is quite possible that fetchers pulled the FINAL_UPDATE
// spill in advance due to smaller output size. In such scenarios, we need to wait until we
// retrieve all spill details to claim success.
if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
remainingMaps.decrementAndGet();
setInputFinished(srcAttemptIdentifier.getInputIdentifier());
numFetchedSpills++;
} else {
int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
// Allow only one task attempt to proceed.
if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
return;
}
ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier);
// Possible that Shuffle event handler invoked this, due to empty partitions
if (eventInfo == null && output == null) {
eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
}
if (eventInfo == null) {
throw new RssException("eventInfo should not be null");
}
eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
numFetchedSpills++;
if (srcAttemptIdentifier.getFetchTypeInfo()
== InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
}
// check if we downloaded all spills pertaining to this InputAttemptIdentifier
if (eventInfo.isDone()) {
remainingMaps.decrementAndGet();
setInputFinished(inputIdentifier);
pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Removing : "
+ srcAttemptIdentifier
+ ", pending: "
+ pipelinedShuffleInfoEventsMap);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("eventInfo " + eventInfo.toString());
}
}
if (remainingMaps.get() == 0) {
notifyAll(); // Notify the getHost() method.
LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
}
// update the status
lastProgressTime = System.currentTimeMillis();
totalBytesShuffledTillNow += bytesCompressed;
logProgress();
reduceShuffleBytes.increment(bytesCompressed);
reduceBytesDecompressed.increment(bytesDecompressed);
if (LOG.isDebugEnabled()) {
LOG.debug(
"src task: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(),
srcAttemptIdentifier.getInputIdentifier(),
srcAttemptIdentifier.getAttemptNumber())
+ " done");
}
} else {
// input is already finished. duplicate fetch.
LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
// free the resource - specially memory
// If the src does not generate data, output will be null.
if (output != null) {
output.abort();
}
}
// NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case
// of speculation.
}