client-tez/src/main/java/org/apache/tez/runtime/library/input/RMRssOrderedGroupedKVInput.java [124:153]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
    this.applicationAttemptId =
        ApplicationAttemptId.newInstance(
            getContext().getApplicationId(), getContext().getDAGAttemptNumber());
    return Collections.emptyList();
  }

  @Override
  public synchronized void start() throws IOException {
    if (!isStarted.get()) {
      memoryUpdateCallbackHandler.validateUpdateReceived();
      // Start the shuffle - copy and merge
      shuffle = createRssShuffle();
      shuffle.run();
      List<Event> pending = new LinkedList<>();
      pendingEvents.drainTo(pending);
      if (pending.size() > 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "NoAutoStart delay in processing first event: "
                  + (System.currentTimeMillis() - firstEventReceivedTime));
        }
        shuffle.handleEvents(pending);
      }
      isStarted.set(true);
    }
  }

  @VisibleForTesting
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java [145:174]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.shuffleId =
        RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
    this.applicationAttemptId =
        ApplicationAttemptId.newInstance(
            getContext().getApplicationId(), getContext().getDAGAttemptNumber());
    return Collections.emptyList();
  }

  @Override
  public synchronized void start() throws IOException {
    if (!isStarted.get()) {
      memoryUpdateCallbackHandler.validateUpdateReceived();
      // Start the shuffle - copy and merge
      shuffle = createRssShuffle();
      shuffle.run();
      List<Event> pending = new LinkedList<>();
      pendingEvents.drainTo(pending);
      if (pending.size() > 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "NoAutoStart delay in processing first event: "
                  + (System.currentTimeMillis() - firstEventReceivedTime));
        }
        shuffle.handleEvents(pending);
      }
      isStarted.set(true);
    }
  }

  @VisibleForTesting
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



