public synchronized void copySucceeded()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [744:870]


  public synchronized void copySucceeded(
      InputAttemptIdentifier srcAttemptIdentifier,
      MapHost host,
      long bytesCompressed,
      long bytesDecompressed,
      long millis,
      MapOutput output,
      boolean isLocalFetch)
      throws IOException {
    inputContext.notifyProgress();
    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
      if (!isLocalFetch) {
        /** Reset it only when it is a non-local-disk copy. */
        failedShufflesSinceLastCompletion = 0;
      }
      if (output != null) {
        failureCounts.remove(srcAttemptIdentifier);
        if (host != null) {
          hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
        }

        output.commit();
        fetchStatsLogger.logIndividualFetchComplete(
            millis,
            bytesCompressed,
            bytesDecompressed,
            output.getType().toString(),
            srcAttemptIdentifier);
        if (output.getType() == Type.DISK) {
          bytesShuffledToDisk.increment(bytesCompressed);
        } else if (output.getType() == Type.DISK_DIRECT) {
          bytesShuffledToDiskDirect.increment(bytesCompressed);
        } else {
          bytesShuffledToMem.increment(bytesCompressed);
        }
        shuffledInputsCounter.increment(1);
      } else {
        // Output null implies that a physical input completion is being
        // registered without needing to fetch data
        skippedInputCounter.increment(1);
      }

      // In case of pipelined shuffle, it is quite possible that fetchers pulled the FINAL_UPDATE
      // spill in advance due to smaller output size. In such scenarios, we need to wait until we
      // retrieve all spill details to claim success.
      if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
        remainingMaps.decrementAndGet();
        setInputFinished(srcAttemptIdentifier.getInputIdentifier());
        numFetchedSpills++;
      } else {
        int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        // Allow only one task attempt to proceed.
        if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
          return;
        }

        ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier);

        // Possible that Shuffle event handler invoked this, due to empty partitions
        if (eventInfo == null && output == null) {
          eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
          pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
        }

        if (eventInfo == null) {
          throw new RssException("eventInfo should not be null");
        }
        eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
        numFetchedSpills++;

        if (srcAttemptIdentifier.getFetchTypeInfo()
            == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
          eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
        }

        // check if we downloaded all spills pertaining to this InputAttemptIdentifier
        if (eventInfo.isDone()) {
          remainingMaps.decrementAndGet();
          setInputFinished(inputIdentifier);
          pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
          if (LOG.isTraceEnabled()) {
            LOG.trace(
                "Removing : "
                    + srcAttemptIdentifier
                    + ", pending: "
                    + pipelinedShuffleInfoEventsMap);
          }
        }

        if (LOG.isTraceEnabled()) {
          LOG.trace("eventInfo " + eventInfo.toString());
        }
      }

      if (remainingMaps.get() == 0) {
        notifyAll(); // Notify the getHost() method.
        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
      }

      // update the status
      lastProgressTime = System.currentTimeMillis();
      totalBytesShuffledTillNow += bytesCompressed;
      logProgress();
      reduceShuffleBytes.increment(bytesCompressed);
      reduceBytesDecompressed.increment(bytesDecompressed);
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "src task: "
                + TezRuntimeUtils.getTaskAttemptIdentifier(
                    inputContext.getSourceVertexName(),
                    srcAttemptIdentifier.getInputIdentifier(),
                    srcAttemptIdentifier.getAttemptNumber())
                + " done");
      }
    } else {
      // input is already finished. duplicate fetch.
      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
      // free the resource - specially memory

      // If the src does not generate data, output will be null.
      if (output != null) {
        output.abort();
      }
    }
    // NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case
    // of speculation.
  }