in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java [256:448]
public ShuffleScheduler(InputContext inputContext,
Configuration conf,
int numberOfInputs,
ExceptionReporter exceptionReporter,
MergeManager mergeManager,
FetchedInputAllocatorOrderedGrouped allocator,
long startTime,
CompressionCodec codec,
boolean ifileReadAhead,
int ifileReadAheadLength,
String srcNameTrimmed) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
this.exceptionReporter = exceptionReporter;
this.allocator = allocator;
this.mergeManager = mergeManager;
this.numInputs = numberOfInputs;
int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT);
if (abortFailureLimitConf <= -1) {
abortFailureLimit = Math.max(15, numberOfInputs / 10);
} else {
//No upper cap, as user is setting this intentionally
abortFailureLimit = abortFailureLimitConf;
}
remainingMaps = new AtomicInteger(numberOfInputs);
finishedMaps = new BitSet(numberOfInputs);
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
this.srcNameTrimmed = srcNameTrimmed;
this.codec = codec;
int configuredNumFetchers =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
numFetchers = Math.min(configuredNumFetchers, numInputs);
localDiskFetchEnabled = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
this.minFailurePerHost = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
Preconditions.checkArgument(minFailurePerHost >= 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST
+ "=" + minFailurePerHost + " should not be negative");
this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT);
this.maxStallTimeFraction = conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT);
Preconditions.checkArgument(maxStallTimeFraction >= 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION
+ "=" + maxStallTimeFraction + " should not be negative");
this.minReqProgressFraction = conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION,
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT);
Preconditions.checkArgument(minReqProgressFraction >= 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION
+ "=" + minReqProgressFraction + " should not be negative");
this.maxAllowedFailedFetchFraction = conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT);
Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION
+ "=" + maxAllowedFailedFetchFraction + " should not be negative");
this.checkFailedFetchSinceLastCompletion = conf.getBoolean
(TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
this.localHostname = inputContext.getExecutionContext().getHostName();
String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
final ByteBuffer shuffleMetadata =
inputContext.getServiceProviderMetaData(auxiliaryService);
this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
this.referee = new Referee();
// Counters used by the ShuffleScheduler
this.shuffledInputsCounter = inputContext.getCounters().findCounter(
TaskCounter.NUM_SHUFFLED_INPUTS);
this.reduceShuffleBytes = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
this.reduceBytesDecompressed = inputContext.getCounters().findCounter(
TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
this.failedShuffleCounter = inputContext.getCounters().findCounter(
TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
this.bytesShuffledToDisk = inputContext.getCounters().findCounter(
TaskCounter.SHUFFLE_BYTES_TO_DISK);
this.bytesShuffledToDiskDirect = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
this.bytesShuffledToMem = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
// Counters used by Fetchers
ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_LENGTH.toString());
badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.BAD_ID.toString());
wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_MAP.toString());
connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.CONNECTION.toString());
wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
this.startTime = startTime;
this.lastProgressTime = startTime;
this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
SecretKey jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(auxiliaryService));
this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret, conf);
final ExecutorService fetcherRawExecutor;
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
"Fetcher_O {" + srcNameTrimmed + "} #%d");
} else {
fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
}
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
referee.start();
this.maxFetchFailuresBeforeReporting =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT);
this.reportReadErrorImmediately =
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
this.verifyDiskChecksum = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
/**
* Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
* be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
*/
this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT);
this.enableFetcherTestingErrors =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT);
pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
LOG.info("ShuffleScheduler running for sourceVertex: "
+ inputContext.getSourceVertexName() + " with configuration: "
+ "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
+ ", reportReadErrorImmediately=" + reportReadErrorImmediately
+ ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+ ", abortFailureLimit=" + abortFailureLimit
+ ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
+ ", numFetchers=" + numFetchers
+ ", hostFailureFraction=" + hostFailureFraction
+ ", minFailurePerHost=" + minFailurePerHost
+ ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction
+ ", maxStallTimeFraction=" + maxStallTimeFraction
+ ", minReqProgressFraction=" + minReqProgressFraction
+ ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion
+ ", asyncHttp=" + asyncHttp
+ ", enableFetcherTestingErrors=" + enableFetcherTestingErrors
);
}