in client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornUnorderedKVInput.java [120:214]
public synchronized void start() throws IOException {
if (!isStarted.get()) {
////// Initial configuration
memoryUpdateCallbackHandler.validateUpdateReceived();
CompressionCodec codec = CodecUtils.getCodec(conf);
boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
boolean ifileReadAhead =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
int ifileReadAheadLength = 0;
int ifileBufferSize = 0;
if (ifileReadAhead) {
ifileReadAheadLength =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
}
ifileBufferSize =
conf.getInt(
"io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
this.inputManager =
new SimpleFetchedInputAllocator(
TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
getContext().getUniqueIdentifier(),
getContext().getDagIdentifier(),
conf,
getContext().getTotalMemoryAvailableToTask(),
memoryUpdateCallbackHandler.getMemoryAssigned());
String host = conf.get(TEZ_CELEBORN_LM_HOST);
int port = conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
int shuffleId = conf.getInt(TEZ_SHUFFLE_ID, -1);
String appId = conf.get(TEZ_CELEBORN_APPLICATION_ID);
boolean broadcastOrOneToOne = conf.getBoolean(TEZ_BROADCAST_OR_ONETOONE, false);
CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
ShuffleClient shuffleClient =
ShuffleClient.get(
appId,
host,
port,
celebornConf,
new UserIdentifier(
celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName()),
null);
this.shuffleManager =
new CelebornShuffleManager(
getContext(),
conf,
getNumPhysicalInputs(),
shuffleClient,
ifileBufferSize,
ifileReadAhead,
ifileReadAheadLength,
codec,
inputManager,
shuffleId,
applicationAttemptId,
broadcastOrOneToOne);
this.inputEventHandler =
new ShuffleInputEventHandlerImpl(
getContext(),
shuffleManager,
inputManager,
codec,
ifileReadAhead,
ifileReadAheadLength,
compositeFetch);
////// End of Initial configuration
this.shuffleManager.run();
this.kvReader =
createReader(
inputRecordCounter, codec, ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
List<Event> pending = new LinkedList<Event>();
pendingEvents.drainTo(pending);
if (pending.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(
getContext().getInputOutputVertexNames()
+ ": "
+ "NoAutoStart delay in processing first event: "
+ (System.currentTimeMillis() - firstEventReceivedTime));
}
inputEventHandler.handleEvents(pending);
}
isStarted.set(true);
}
}