public FetchResult callInternal()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java [255:319]


  public FetchResult callInternal() throws Exception {
    boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);

    if (srcAttempts.size() == 0) {
      return new FetchResult(host, port, partition, partitionCount, srcAttempts);
    }

    populateRemainingMap(srcAttempts);
    for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) {
      if (in instanceof CompositeInputAttemptIdentifier) {
        CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in;
        for (int i = 0; i < cin.getInputIdentifierCount(); i++) {
          pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), partition + i), cin.expand(i));
        }
      } else {
        pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in);
      }

      // do only if all of them are shared fetches
      multiplex &= in.isShared();
    }

    if (multiplex) {
      Preconditions.checkArgument(partition == 0,
          "Shared fetches cannot be done for partitioned input"
              + "- partition is non-zero (%d)", partition);
    }

    HostFetchResult hostFetchResult;

    boolean isLocalFetch = localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort;
    if (isLocalFetch) {
      hostFetchResult = setupLocalDiskFetch();
    } else if (multiplex) {
      hostFetchResult = doSharedFetch();
    } else{
      hostFetchResult = doHttpFetch();
    }

    if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
      if (!isShutDown.get()) {
        LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
        for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
          fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
        }
      } else {
        if (isDebugEnabled) {
          LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
              " inputs since the fetcher has already been stopped");
        }
      }
    }

    shutdown();

    // Sanity check
    if (hostFetchResult.failedInputs == null && !srcAttemptsRemaining.isEmpty()) {
      if (!multiplex) {
        throw new IOException("server didn't return all expected map outputs: "
            + srcAttemptsRemaining.size() + " left.");
      }
    }

    return hostFetchResult.fetchResult;
  }