public FetchResult call()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java [108:208]


  public FetchResult call() throws Exception {
    if (srcAttempts.size() == 0) {
      return new FetchResult(host, port, partition, srcAttempts);
    }

    for (InputAttemptIdentifier in : srcAttempts) {
      pathToAttemptMap.put(in.getPathComponent(), in);
    }

    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);

    try {
      StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
        port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
      this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
        httpConnectionParams.getKeepAlive());

      httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
      httpConnection.connect();
    } catch (IOException e) {
      // ioErrs.increment(1);
      // If connect did not succeed, just mark all the maps as failed,
      // indirectly penalizing the host
      if (isShutDown.get()) {
        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
      } else {
        for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
            .hasNext();) {
          fetcherCallback.fetchFailed(host, leftIter.next(), true);
        }
      }
      return new FetchResult(host, port, partition, remaining);
    }
    if (isShutDown.get()) {
      // shutdown would have no effect if in the process of establishing the connection.
      shutdownInternal();
      LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
      return new FetchResult(host, port, partition, remaining);
    }

    try {
      input = httpConnection.getInputStream();
      httpConnection.validate();
      //validateConnectionResponse(msgToEncode, encHash);
    } catch (IOException e) {
      // ioErrs.increment(1);
      // If we got a read error at this stage, it implies there was a problem
      // with the first map, typically lost map. So, penalize only that map
      // and add the rest
      if (isShutDown.get()) {
        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
      } else {
        InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
        LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
            + " Informing ShuffleManager: ", e);
        fetcherCallback.fetchFailed(host, firstAttempt, false);
        return new FetchResult(host, port, partition, remaining);
      }
    }

    // By this point, the connection is setup and the response has been
    // validated.

    // Handle any shutdown which may have been invoked.
    if (isShutDown.get()) {
      // shutdown would have no effect if in the process of establishing the connection.
      shutdownInternal();
      LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
      return new FetchResult(host, port, partition, remaining);
    }
    // After this point, closing the stream and connection, should cause a
    // SocketException,
    // which will be ignored since shutdown has been invoked.

    // Loop through available map-outputs and fetch them
    // On any error, faildTasks is not null and we exit
    // after putting back the remaining maps to the
    // yet_to_be_fetched list and marking the failed tasks.
    InputAttemptIdentifier[] failedInputs = null;
    while (!remaining.isEmpty() && failedInputs == null) {
      failedInputs = fetchInputs(input);
    }

    if (failedInputs != null && failedInputs.length > 0) {
      LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
      for (InputAttemptIdentifier left : failedInputs) {
        fetcherCallback.fetchFailed(host, left, false);
      }
    }

    shutdown();

    // Sanity check
    if (failedInputs == null && !remaining.isEmpty()) {
      throw new IOException("server didn't return all expected map outputs: "
          + remaining.size() + " left.");
    }

    return new FetchResult(host, port, partition, remaining);

  }