in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java [225:348]
public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
this.inputContext = inputContext;
this.numInputs = numInputs;
this.approximateInputRecords = inputContext.getCounters().findCounter(TaskCounter.APPROXIMATE_INPUT_RECORDS);
this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
this.ifileBufferSize = bufferSize;
this.ifileReadAhead = ifileReadAheadEnabled;
this.ifileReadAheadLength = ifileReadAheadLength;
this.codec = codec;
this.inputManager = inputAllocator;
this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
this.verifyDiskChecksum = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
this.maxTimeToWaitForReportMillis = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT);
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
this.enableFetcherTestingErrors =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT);
this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());
completedInputSet = new BitSet(numInputs);
/**
* In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
* We do not know upfront the number of spills from source.
*/
completedInputs = new LinkedBlockingDeque<FetchedInput>();
knownSrcHosts = new ConcurrentHashMap<HostPort, InputHost>();
pendingHosts = new LinkedBlockingQueue<InputHost>();
obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
int maxConfiguredFetchers =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
final ExecutorService fetcherRawExecutor;
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
"Fetcher_B {" + sourceDestNameTrimmed + "} #%d");
} else {
fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Fetcher_B {" + sourceDestNameTrimmed + "} #%d").build());
}
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleRunner {" + sourceDestNameTrimmed + "}").build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.schedulerCallable = new RunShuffleCallable(conf);
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
SecretKey shuffleSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret, conf);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
this.localDirAllocator = new LocalDirAllocator(
TezRuntimeFrameworkConfigs.LOCAL_DIRS);
this.localDisks = Iterables.toArray(
localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
this.localhostName = inputContext.getExecutionContext().getHostName();
final ByteBuffer shuffleMetaData =
inputContext.getServiceProviderMetaData(auxiliaryService);
this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
/**
* Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
* be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
*/
this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
Arrays.sort(this.localDisks);
shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>();
LOG.info(sourceDestNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+ numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+ "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
+ "sharedFetchEnabled=" + sharedFetchEnabled + ", "
+ httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
+ ", asyncHttp=" + asyncHttp);
}