Fetcher constructFetcherForHost()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [696:799]


  Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
    Path lockDisk = null;

    if (sharedFetchEnabled) {
      // pick a single lock disk from the edge name's hashcode + host hashcode
      final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
      lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
    }

    FetcherBuilder fetcherBuilder =
        new FetcherBuilder(
            RssShuffleManager.this,
            httpConnectionParams,
            inputManager,
            inputContext.getApplicationId(),
            inputContext.getDagIdentifier(),
            null,
            srcNameTrimmed,
            conf,
            localFs,
            localDirAllocator,
            lockDisk,
            localDiskFetchEnabled,
            sharedFetchEnabled,
            localhostName,
            shufflePort,
            asyncHttp,
            verifyDiskChecksum,
            compositeFetch);

    if (codec != null) {
      fetcherBuilder.setCompressionParameters(codec);
    }
    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);

    // Remove obsolete inputs from the list being given to the fetcher. Also
    // remove from the obsolete list.
    PartitionToInputs pendingInputsOfOnePartitionRange = inputHost.clearAndGetOnePartitionRange();
    int includedMaps = 0;
    for (Iterator<InputAttemptIdentifier> inputIter =
            pendingInputsOfOnePartitionRange.getInputs().iterator();
        inputIter.hasNext(); ) {
      InputAttemptIdentifier input = inputIter.next();

      // For pipelined shuffle.
      if (!validateInputAttemptForPipelinedShuffle(input)) {
        continue;
      }

      // Avoid adding attempts which have already completed.
      boolean alreadyCompleted;
      if (input instanceof CompositeInputAttemptIdentifier) {
        CompositeInputAttemptIdentifier compositeInput = (CompositeInputAttemptIdentifier) input;
        int nextClearBit = completedInputSet.nextClearBit(compositeInput.getInputIdentifier());
        int maxClearBit =
            compositeInput.getInputIdentifier() + compositeInput.getInputIdentifierCount();
        alreadyCompleted = nextClearBit > maxClearBit;
      } else {
        alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
      }
      // Avoid adding attempts which have already completed or have been marked as OBSOLETE
      if (alreadyCompleted || obsoletedInputs.contains(input)) {
        inputIter.remove();
        continue;
      }

      // Check if max threshold is met
      if (includedMaps >= maxTaskOutputAtOnce) {
        inputIter.remove();
        // add to inputHost
        inputHost.addKnownInput(
            pendingInputsOfOnePartitionRange.getPartition(),
            pendingInputsOfOnePartitionRange.getPartitionCount(),
            input);
      } else {
        includedMaps++;
      }
    }
    if (inputHost.getNumPendingPartitions() > 0) {
      pendingHosts.add(inputHost); // add it to queue
    }
    for (InputAttemptIdentifier input : pendingInputsOfOnePartitionRange.getInputs()) {
      ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
      if (eventInfo != null) {
        eventInfo.scheduledForDownload = true;
      }
    }
    fetcherBuilder.assignWork(
        inputHost.getHost(),
        inputHost.getPort(),
        pendingInputsOfOnePartitionRange.getPartition(),
        pendingInputsOfOnePartitionRange.getPartitionCount(),
        pendingInputsOfOnePartitionRange.getInputs());
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "Created Fetcher for host: "
              + inputHost.getHost()
              + ", info: "
              + inputHost.getAdditionalInfo()
              + ", with inputs: "
              + pendingInputsOfOnePartitionRange);
    }
    return fetcherBuilder.build();
  }