in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [135:247]
public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
long initialMemoryAvailable) throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT);
if (lazyAllocateMem) {
/**
* When lazy-allocation is enabled, framework takes care of auto
* allocating memory on need basis. Desirable block size is set to 256MB
*/
//256 MB - 64 bytes. See comment for the 32MB allocation.
MIN_BLOCK_SIZE = ((256 << 20) - 64);
} else {
int minBlockSize = conf.getInt(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB,
TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT);
Preconditions.checkArgument(
(minBlockSize > 0 && minBlockSize < 2047),
TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB
+ "=" + minBlockSize + " should be a positive value between 0 and 2047");
MIN_BLOCK_SIZE = minBlockSize << 20;
}
StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
.append(outputContext.getInputOutputVertexNames()).append(": ");
partitionBits = bitcount(partitions)+1;
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
//sanity checks
final long sortmb = this.availableMemoryMb;
// buffers and accounting
long maxMemLimit = sortmb << 20;
initialSetupLogLine.append(", UsingHashComparator=");
// k/v serialization
if(comparator instanceof ProxyComparator) {
hasher = (ProxyComparator)comparator;
initialSetupLogLine.append(true);
} else {
hasher = null;
initialSetupLogLine.append(false);
}
LOG.info(initialSetupLogLine.toString());
long totalCapacityWithoutMeta = 0;
long availableMem = maxMemLimit;
int numBlocks = 0;
while(availableMem > 0) {
long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit));
int sizeWithoutMeta = (int) ((size) - (size % METASIZE));
totalCapacityWithoutMeta += sizeWithoutMeta;
availableMem -= size;
numBlocks++;
}
currentAllocatableMemory = maxMemLimit;
maxNumberOfBlocks = numBlocks;
capacity = totalCapacityWithoutMeta;
buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks);
allocateSpace(); //Allocate the first block
if (!lazyAllocateMem) {
LOG.info("Pre allocating rest of memory buffers upfront");
while(allocateSpace() != null);
}
initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks);
initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
initialSetupLogLine.append(", lazyAllocateMem=").append(
lazyAllocateMem);
initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE);
initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity());
initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled());
initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails);
initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
"=").append(
sortmb);
Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present");
LOG.info(initialSetupLogLine.toString());
span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator);
merger = new SpanMerger(); // SpanIterators are comparable
final int sortThreads =
this.conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS,
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
sortmaster = Executors.newFixedThreadPool(sortThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
.build());
valSerializer.open(span.out);
keySerializer.open(span.out);
minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
deflater = TezCommonUtils.newBestCompressionDeflater();
finalEvents = Lists.newLinkedList();
}