public void copyFromRssServer()

in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java [94:172]


  public void copyFromRssServer() throws IOException {
    CompressedShuffleBlock compressedBlock = null;
    ByteBuffer compressedData = null;
    long blockStartFetch = 0;
    // fetch a block
    if (!hasPendingData) {
      final long startFetch = System.currentTimeMillis();
      blockStartFetch = System.currentTimeMillis();
      compressedBlock = shuffleReadClient.readShuffleBlockData();
      if (compressedBlock != null) {
        compressedData = compressedBlock.getByteBuffer();
      }
      long fetchDuration = System.currentTimeMillis() - startFetch;
      readTime += fetchDuration;
    }

    // uncompress the block
    if (!hasPendingData && compressedData != null) {
      final long startDecompress = System.currentTimeMillis();
      int uncompressedLen = compressedBlock.getUncompressLength();
      ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
      codec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
      uncompressedData = decompressedBuffer.array();
      unCompressionLength += compressedBlock.getUncompressLength();
      long decompressDuration = System.currentTimeMillis() - startDecompress;
      decompressTime += decompressDuration;
    }

    if (uncompressedData != null) {
      // start to merge
      final long startSerialization = System.currentTimeMillis();
      int compressedDataLength = compressedData != null ? compressedData.capacity() : 0;
      if (issueMapOutputMerge(compressedDataLength, blockStartFetch)) {
        long serializationDuration = System.currentTimeMillis() - startSerialization;
        serializeTime += serializationDuration;
        // if reserve successes, reset status for next fetch
        if (hasPendingData) {
          waitTime += System.currentTimeMillis() - startWait;
        }
        hasPendingData = false;
        uncompressedData = null;
      } else {
        LOG.info("UncompressedData is null");
        // if reserve fail, return and wait
        waitCount++;
        startWait = System.currentTimeMillis();
        return;
      }

      // update some status
      copyBlockCount++;
      copyTime = readTime + decompressTime + serializeTime + waitTime;
    } else {
      LOG.info("UncompressedData is null");
      // finish reading data, close related reader and check data consistent
      shuffleReadClient.close();
      shuffleReadClient.checkProcessedBlockIds();
      shuffleReadClient.logStatics();
      LOG.info(
          "Reduce task partition:"
              + partitionId
              + " read block cnt: "
              + copyBlockCount
              + " cost "
              + readTime
              + " ms to fetch and "
              + decompressTime
              + " ms to decompress with unCompressionLength["
              + unCompressionLength
              + "] and "
              + serializeTime
              + " ms to serialize and "
              + waitTime
              + " ms to wait resource, total copy time: "
              + copyTime);
      LOG.info("Stop fetcher");
      stopFetch();
    }
  }