in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [203:333]
public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
int numOutputs, long availableMemoryBytes) throws IOException {
super(outputContext, conf, numOutputs);
Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
//Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
// this case. Add it later if needed.
boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
this.isFinalMergeEnabled = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled;
this.finalEvents = Lists.newLinkedList();
this.dataViaEventsEnabled = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT);
// No max cap on size (intentional)
this.dataViaEventsMaxSize = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
boolean useCachedStreamConfig = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT);
this.useCachedStream = useCachedStreamConfig && (this.dataViaEventsEnabled && (numPartitions == 1)
&& !pipelinedShuffle);
if (availableMemoryBytes == 0) {
Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
+ "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is disabled. current numPartitions=" +
numPartitions + ", " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + "="
+ pipelinedShuffle);
}
// Ideally, should be significantly larger.
availableMemory = availableMemoryBytes;
// Allow unit tests to control the buffer sizes.
int maxSingleBufferSizeBytes = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
Integer.MAX_VALUE);
computeNumBuffersAndSize(maxSingleBufferSizeBytes);
availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
buffers = new WrappedBuffer[numBuffers];
// Set up only the first buffer to start with.
buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
numInitializedBuffers = 1;
if (LOG.isDebugEnabled()) {
LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" +
numInitializedBuffers + " with size=" + sizePerBuffer);
}
currentBuffer = buffers[0];
baos = new ByteArrayOutputStream();
dos = new NonSyncDataOutputStream(baos);
keySerializer.open(dos);
valSerializer.open(dos);
rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
int maxThreads = Math.max(2, numBuffers/2);
//TODO: Make use of TezSharedExecutor later
ExecutorService executor = new ThreadPoolExecutor(1, maxThreads,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(
"UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName(
outputContext.getDestinationVertexName()) + "} #%d")
.build()
);
// to restrict submission of more tasks than threads (e.g numBuffers > numThreads)
// This is maxThreads - 1, to avoid race between callback thread releasing semaphore and the
// thread calling tryAcquire.
availableSlots = new Semaphore(maxThreads - 1, true);
spillExecutor = MoreExecutors.listeningDecorator(executor);
numRecordsPerPartition = new int[numPartitions];
reportPartitionStats = ReportPartitionStats.fromString(
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
sizePerPartition = (reportPartitionStats.isEnabled()) ?
new long[numPartitions] : null;
outputLargeRecordsCounter = outputContext.getCounters().findCounter(
TaskCounter.OUTPUT_LARGE_RECORDS);
indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
if (numPartitions == 1 && !pipelinedShuffle) {
//special case, where in only one partition is available.
skipBuffers = true;
if (this.useCachedStream) {
writer = new IFile.FileBackedInMemIFileWriter(keySerialization, valSerialization, rfs,
outputFileHandler, keyClass, valClass, codec, outputRecordsCounter,
outputRecordBytesCounter, dataViaEventsMaxSize);
} else {
finalOutPath = outputFileHandler.getOutputFileForWrite();
writer = new IFile.Writer(keySerialization, valSerialization, rfs, finalOutPath, keyClass, valClass,
codec, outputRecordsCounter, outputRecordBytesCounter);
ensureSpillFilePermissions(finalOutPath, rfs);
}
} else {
skipBuffers = false;
writer = null;
}
LOG.info(sourceDestNameTrimmed + ": "
+ "numBuffers=" + numBuffers
+ ", sizePerBuffer=" + sizePerBuffer
+ ", skipBuffers=" + skipBuffers
+ ", numPartitions=" + numPartitions
+ ", availableMemory=" + availableMemory
+ ", maxSingleBufferSizeBytes=" + maxSingleBufferSizeBytes
+ ", pipelinedShuffle=" + pipelinedShuffle
+ ", isFinalMergeEnabled=" + isFinalMergeEnabled
+ ", numPartitions=" + numPartitions
+ ", reportPartitionStats=" + reportPartitionStats
+ ", dataViaEventsEnabled=" + dataViaEventsEnabled
+ ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
+ ", useCachedStreamConfig=" + useCachedStreamConfig
+ ", useCachedStream=" + useCachedStream
);
}