in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [691:794]
Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
Path lockDisk = null;
if (sharedFetchEnabled) {
// pick a single lock disk from the edge name's hashcode + host hashcode
final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
}
FetcherBuilder fetcherBuilder =
new FetcherBuilder(
RssShuffleManager.this,
httpConnectionParams,
inputManager,
inputContext.getApplicationId(),
inputContext.getDagIdentifier(),
null,
srcNameTrimmed,
conf,
localFs,
localDirAllocator,
lockDisk,
localDiskFetchEnabled,
sharedFetchEnabled,
localhostName,
shufflePort,
asyncHttp,
verifyDiskChecksum,
compositeFetch);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
}
fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
// Remove obsolete inputs from the list being given to the fetcher. Also
// remove from the obsolete list.
PartitionToInputs pendingInputsOfOnePartitionRange = inputHost.clearAndGetOnePartitionRange();
int includedMaps = 0;
for (Iterator<InputAttemptIdentifier> inputIter =
pendingInputsOfOnePartitionRange.getInputs().iterator();
inputIter.hasNext(); ) {
InputAttemptIdentifier input = inputIter.next();
// For pipelined shuffle.
if (!validateInputAttemptForPipelinedShuffle(input)) {
continue;
}
// Avoid adding attempts which have already completed.
boolean alreadyCompleted;
if (input instanceof CompositeInputAttemptIdentifier) {
CompositeInputAttemptIdentifier compositeInput = (CompositeInputAttemptIdentifier) input;
int nextClearBit = completedInputSet.nextClearBit(compositeInput.getInputIdentifier());
int maxClearBit =
compositeInput.getInputIdentifier() + compositeInput.getInputIdentifierCount();
alreadyCompleted = nextClearBit > maxClearBit;
} else {
alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
}
// Avoid adding attempts which have already completed or have been marked as OBSOLETE
if (alreadyCompleted || obsoletedInputs.contains(input)) {
inputIter.remove();
continue;
}
// Check if max threshold is met
if (includedMaps >= maxTaskOutputAtOnce) {
inputIter.remove();
// add to inputHost
inputHost.addKnownInput(
pendingInputsOfOnePartitionRange.getPartition(),
pendingInputsOfOnePartitionRange.getPartitionCount(),
input);
} else {
includedMaps++;
}
}
if (inputHost.getNumPendingPartitions() > 0) {
pendingHosts.add(inputHost); // add it to queue
}
for (InputAttemptIdentifier input : pendingInputsOfOnePartitionRange.getInputs()) {
ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
if (eventInfo != null) {
eventInfo.scheduledForDownload = true;
}
}
fetcherBuilder.assignWork(
inputHost.getHost(),
inputHost.getPort(),
pendingInputsOfOnePartitionRange.getPartition(),
pendingInputsOfOnePartitionRange.getPartitionCount(),
pendingInputsOfOnePartitionRange.getInputs());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Created Fetcher for host: "
+ inputHost.getHost()
+ ", info: "
+ inputHost.getAdditionalInfo()
+ ", with inputs: "
+ pendingInputsOfOnePartitionRange);
}
return fetcherBuilder.build();
}