public synchronized void copySucceeded()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java [581:699]


  public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
                                         MapHost host,
                                         long bytesCompressed,
                                         long bytesDecompressed,
                                         long millis,
                                         MapOutput output,
                                         boolean isLocalFetch
                                         ) throws IOException {

    inputContext.notifyProgress();
    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
      if (!isLocalFetch) {
        /**
         * Reset it only when it is a non-local-disk copy.
         */
        failedShufflesSinceLastCompletion = 0;
      }
      if (output != null) {

        failureCounts.remove(srcAttemptIdentifier);
        if (host != null) {
          hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
        }

        output.commit();
        fetchStatsLogger.logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed,
            output.getType().toString(), srcAttemptIdentifier);
        if (output.getType() == Type.DISK) {
          bytesShuffledToDisk.increment(bytesCompressed);
        } else if (output.getType() == Type.DISK_DIRECT) {
          bytesShuffledToDiskDirect.increment(bytesCompressed);
        } else {
          bytesShuffledToMem.increment(bytesCompressed);
        }
        shuffledInputsCounter.increment(1);
      } else {
        // Output null implies that a physical input completion is being
        // registered without needing to fetch data
        skippedInputCounter.increment(1);
      }

      /**
       * In case of pipelined shuffle, it is quite possible that fetchers pulled the FINAL_UPDATE
       * spill in advance due to smaller output size.  In such scenarios, we need to wait until
       * we retrieve all spill details to claim success.
       */
      if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
        remainingMaps.decrementAndGet();
        setInputFinished(srcAttemptIdentifier.getInputIdentifier());
        numFetchedSpills++;
      } else {
        int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        //Allow only one task attempt to proceed.
        if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
          return;
        }

        ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier);

        //Possible that Shuffle event handler invoked this, due to empty partitions
        if (eventInfo == null && output == null) {
          eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
          pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
        }

        assert(eventInfo != null);
        eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
        numFetchedSpills++;

        if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
          eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
        }

        //check if we downloaded all spills pertaining to this InputAttemptIdentifier
        if (eventInfo.isDone()) {
          remainingMaps.decrementAndGet();
          setInputFinished(inputIdentifier);
          pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
          if (LOG.isTraceEnabled()) {
            LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
                pipelinedShuffleInfoEventsMap);
          }
        }

        if (LOG.isTraceEnabled()) {
          LOG.trace("eventInfo " + eventInfo.toString());
        }
      }

      if (remainingMaps.get() == 0) {
        notifyAll(); // Notify the getHost() method.
        LOG.info("All inputs fetched for input vertex : " + inputContext.getInputOutputVertexNames());
      }

      // update the status
      lastProgressTime = System.currentTimeMillis();
      totalBytesShuffledTillNow += bytesCompressed;
      logProgress();
      reduceShuffleBytes.increment(bytesCompressed);
      reduceBytesDecompressed.increment(bytesDecompressed);
      if (LOG.isDebugEnabled()) {
        LOG.debug("src task: "
            + TezRuntimeUtils.getTaskAttemptIdentifier(
                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(),
                srcAttemptIdentifier.getAttemptNumber()) + " done");
      }
    } else {
      // input is already finished. duplicate fetch.
      LOG.warn("Duplicate fetch of input no longer needs to be fetched: "
          + srcAttemptIdentifier);
      // free the resource - specially memory

      // If the src does not generate data, output will be null.
      if (output != null) {
        output.abort();
      }
    }
    // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
  }