public void addKnownInput()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [798:879]


  public void addKnownInput(
      String hostName,
      int port,
      CompositeInputAttemptIdentifier srcAttemptIdentifier,
      int srcPhysicalIndex) {
    HostPort identifier = new HostPort(hostName, port);
    InputHost host = knownSrcHosts.get(identifier);
    if (host == null) {
      host = new InputHost(identifier);
      InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
      if (old != null) {
        host = old;
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host);
    }

    if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
      return;
    }
    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
    for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) {
      if (shuffleInfoEventsMap.get(inputIdentifier + i) == null) {
        shuffleInfoEventsMap.put(
            inputIdentifier + i, new ShuffleEventInfo(srcAttemptIdentifier.expand(i)));
        LOG.info(
            "AddKnownInput, srcAttemptIdentifier:{}, i:{}, expand:{}, map:{}",
            srcAttemptIdentifier,
            i,
            srcAttemptIdentifier.expand(i),
            shuffleInfoEventsMap);
      }
    }

    host.addKnownInput(
        srcPhysicalIndex, srcAttemptIdentifier.getInputIdentifierCount(), srcAttemptIdentifier);
    lock.lock();
    try {
      boolean added = pendingHosts.offer(host);
      if (!added) {
        String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
        LOG.error(errorMessage);
        throw new TezUncheckedException(errorMessage);
      }
      wakeLoop.signal();
    } finally {
      lock.unlock();
    }

    LOG.info(
        "AddKnowInput, hostname:{}, port:{}, srcAttemptIdentifier:{}, srcPhysicalIndex:{}",
        hostName,
        port,
        srcAttemptIdentifier,
        srcPhysicalIndex);

    lock.lock();
    try {
      for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) {
        int p = srcPhysicalIndex + i;
        LOG.info(
            "PartitionToInput, original:{}, add:{},  now:{}",
            srcAttemptIdentifier,
            srcAttemptIdentifier.expand(i),
            partitionToInput.get(p));
        if (!allRssPartition.contains(srcPhysicalIndex + i)) {
          pendingPartition.add(p);
        }
        allRssPartition.add(p);
        partitionToInput.putIfAbsent(p, new ArrayList<>());
        partitionToInput.get(p).add(srcAttemptIdentifier);
        LOG.info("Add partition:{}, after add, now partition:{}", p, allRssPartition);
      }

      numWithDataInput.incrementAndGet();
      LOG.info("numWithDataInput:{}.", numWithDataInput.get());
    } finally {
      lock.unlock();
    }
  }