in client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java [144:233]
public synchronized void start() throws IOException {
if (!isStarted.get()) {
////// Initial configuration
memoryUpdateCallbackHandler.validateUpdateReceived();
CompressionCodec codec;
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
} else {
codec = null;
}
boolean ifileReadAhead =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
int ifileReadAheadLength = 0;
int ifileBufferSize = 0;
if (ifileReadAhead) {
ifileReadAheadLength =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
}
ifileBufferSize =
conf.getInt(
"io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
LOG.info(
"RssUnorderedKVInput, totalMemoryAvailable:{}, available memory:{}",
getContext().getTotalMemoryAvailableToTask(),
memoryUpdateCallbackHandler.getMemoryAssigned());
boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
this.inputManager =
new RssSimpleFetchedInputAllocator(
TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
getContext().getUniqueIdentifier(),
getContext().getDagIdentifier(),
conf,
getContext().getTotalMemoryAvailableToTask(),
memoryUpdateCallbackHandler.getMemoryAssigned());
this.rssShuffleManager =
new RssShuffleManager(
getContext(),
conf,
getNumPhysicalInputs(),
ifileBufferSize,
ifileReadAhead,
ifileReadAheadLength,
codec,
inputManager,
shuffleId,
applicationAttemptId);
this.inputEventHandler =
new ShuffleInputEventHandlerImpl(
getContext(),
rssShuffleManager,
inputManager,
codec,
ifileReadAhead,
ifileReadAheadLength,
compositeFetch);
////// End of Initial configuration
this.rssShuffleManager.run();
this.kvReader =
createReader(
inputRecordCounter, codec, ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
List<Event> pending = new LinkedList<>();
pendingEvents.drainTo(pending);
if (pending.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(
getContext().getSourceVertexName()
+ ": "
+ "NoAutoStart delay in processing first event: "
+ (System.currentTimeMillis() - firstEventReceivedTime));
}
inputEventHandler.handleEvents(pending);
}
isStarted.set(true);
}
}