in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java [94:172]
public void copyFromRssServer() throws IOException {
CompressedShuffleBlock compressedBlock = null;
ByteBuffer compressedData = null;
long blockStartFetch = 0;
// fetch a block
if (!hasPendingData) {
final long startFetch = System.currentTimeMillis();
blockStartFetch = System.currentTimeMillis();
compressedBlock = shuffleReadClient.readShuffleBlockData();
if (compressedBlock != null) {
compressedData = compressedBlock.getByteBuffer();
}
long fetchDuration = System.currentTimeMillis() - startFetch;
readTime += fetchDuration;
}
// uncompress the block
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
codec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
}
if (uncompressedData != null) {
// start to merge
final long startSerialization = System.currentTimeMillis();
int compressedDataLength = compressedData != null ? compressedData.capacity() : 0;
if (issueMapOutputMerge(compressedDataLength, blockStartFetch)) {
long serializationDuration = System.currentTimeMillis() - startSerialization;
serializeTime += serializationDuration;
// if reserve successes, reset status for next fetch
if (hasPendingData) {
waitTime += System.currentTimeMillis() - startWait;
}
hasPendingData = false;
uncompressedData = null;
} else {
LOG.info("UncompressedData is null");
// if reserve fail, return and wait
waitCount++;
startWait = System.currentTimeMillis();
return;
}
// update some status
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
} else {
LOG.info("UncompressedData is null");
// finish reading data, close related reader and check data consistent
shuffleReadClient.close();
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
LOG.info(
"Reduce task partition:"
+ partitionId
+ " read block cnt: "
+ copyBlockCount
+ " cost "
+ readTime
+ " ms to fetch and "
+ decompressTime
+ " ms to decompress with unCompressionLength["
+ unCompressionLength
+ "] and "
+ serializeTime
+ " ms to serialize and "
+ waitTime
+ " ms to wait resource, total copy time: "
+ copyTime);
LOG.info("Stop fetcher");
stopFetch();
}
}