in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [220:401]
public RssShuffleManager(
InputContext inputContext,
Configuration conf,
int numInputs,
int bufferSize,
boolean ifileReadAheadEnabled,
int ifileReadAheadLength,
CompressionCodec codec,
FetchedInputAllocator inputAllocator,
int shuffleId,
ApplicationAttemptId applicationAttemptId)
throws IOException {
super(
inputContext,
conf,
numInputs,
bufferSize,
ifileReadAheadEnabled,
ifileReadAheadLength,
codec,
inputAllocator);
this.inputContext = inputContext;
this.conf = conf;
this.numInputs = numInputs;
this.shuffleId = shuffleId;
this.applicationAttemptId = applicationAttemptId;
this.shuffledInputsCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter =
inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.decompressedDataSizeCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.bytesShuffledDirectDiskCounter =
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
this.ifileBufferSize = bufferSize;
this.ifileReadAhead = ifileReadAheadEnabled;
this.ifileReadAheadLength = ifileReadAheadLength;
this.codec = codec;
this.inputManager = inputAllocator;
this.localDiskFetchEnabled =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
this.sharedFetchEnabled =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
this.verifyDiskChecksum =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
this.maxTimeToWaitForReportMillis = 1;
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
this.firstEventReceived =
inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived =
inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
completedInputSet = new BitSet(numInputs);
/**
* In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. We do
* not know upfront the number of spills from source.
*/
completedInputs = new LinkedBlockingDeque<>();
knownSrcHosts = new ConcurrentHashMap<>();
pendingHosts = new LinkedBlockingQueue<>();
obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
rssRunningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<>());
int maxConfiguredFetchers =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
final ExecutorService fetcherRawExecutor;
if (conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
fetcherRawExecutor =
inputContext.createTezFrameworkExecutorService(
numFetchers, "Fetcher_B {" + srcNameTrimmed + "} #%d");
} else {
fetcherRawExecutor =
Executors.newFixedThreadPool(
numFetchers,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d")
.build());
}
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
ExecutorService schedulerRawExecutor =
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
.build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.rssSchedulerCallable = new RssRunShuffleCallable(conf);
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
this.asyncHttp =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
this.localDisks =
Iterables.toArray(localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
this.localhostName = inputContext.getExecutionContext().getHostName();
String auxiliaryService =
conf.get(
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
final ByteBuffer shuffleMetaData = inputContext.getServiceProviderMetaData(auxiliaryService);
this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
/**
* Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would be
* approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
*/
this.maxTaskOutputAtOnce =
Math.max(
1,
Math.min(
75,
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
if (null != this.localDisks) {
Arrays.sort(this.localDisks);
}
shuffleInfoEventsMap = new ConcurrentHashMap<>();
LOG.info(
srcNameTrimmed
+ ": numInputs="
+ numInputs
+ ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName())
+ ", numFetchers="
+ numFetchers
+ ", ifileBufferSize="
+ ifileBufferSize
+ ", ifileReadAheadEnabled="
+ ifileReadAhead
+ ", ifileReadAheadLength="
+ ifileReadAheadLength
+ ", "
+ "localDiskFetchEnabled="
+ localDiskFetchEnabled
+ ", "
+ "sharedFetchEnabled="
+ sharedFetchEnabled
+ ", "
+ httpConnectionParams.toString()
+ ", maxTaskOutputAtOnce="
+ maxTaskOutputAtOnce);
}