public CompressedShuffleBlock readShuffleBlockData()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java [254:339]


  public CompressedShuffleBlock readShuffleBlockData() {
    // empty data expected, just return null
    if (blockIdBitmap.isEmpty()) {
      return null;
    }

    // All blocks are processed, so just return
    if (pendingBlockIds.isEmpty()) {
      return null;
    }

    // if client need request new data from shuffle server
    if (bufferSegmentQueue.isEmpty()) {
      if (read() <= 0) {
        return null;
      }
    }

    // get next buffer segment
    BufferSegment bs = null;

    // blocks in bufferSegmentQueue may be from different partition in range partition mode,
    // or may be from speculation task, filter them and just read the necessary block
    while (true) {
      bs = bufferSegmentQueue.poll();
      if (bs == null) {
        break;
      }
      // check 1: if blockId is processed
      // check 2: if blockId is required for current partition
      // check 3: if blockId is generated by required task
      if (!processedBlockIds.contains(bs.getBlockId())
          && blockIdBitmap.contains(bs.getBlockId())
          && taskIdBitmap.contains(bs.getTaskAttemptId())) {
        long expectedCrc = -1;
        long actualCrc = -1;
        try {
          long start = System.currentTimeMillis();
          expectedCrc = bs.getCrc();
          actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
          crcCheckTime.addAndGet(System.currentTimeMillis() - start);
        } catch (Exception e) {
          LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
        }

        if (expectedCrc != actualCrc) {
          String errMsg =
              "Unexpected crc value for blockId["
                  + bs.getBlockId()
                  + "], expected:"
                  + expectedCrc
                  + ", actual:"
                  + actualCrc;
          // If some blocks of one replica are corrupted,but maybe other replicas are not corrupted,
          // so exception should not be thrown here if blocks have multiple replicas
          if (shuffleServerInfoList.size() > 1) {
            LOG.warn(errMsg);
            clientReadHandler.updateConsumedBlockInfo(bs, true);
            continue;
          } else {
            throw new RssFetchFailedException(errMsg);
          }
        }

        // mark block as processed
        processedBlockIds.addLong(bs.getBlockId());
        pendingBlockIds.removeLong(bs.getBlockId());
        // only update the statistics of necessary blocks
        clientReadHandler.updateConsumedBlockInfo(bs, false);
        break;
      }
      clientReadHandler.updateConsumedBlockInfo(bs, true);
      // mark block as processed
      processedBlockIds.addLong(bs.getBlockId());
      pendingBlockIds.removeLong(bs.getBlockId());
    }

    if (bs != null) {
      ByteBuffer compressedBuffer = readBuffer.duplicate();
      compressedBuffer.position(bs.getOffset());
      compressedBuffer.limit(bs.getOffset() + bs.getLength());
      return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
    }
    // current segment hasn't data, try next segment
    return readShuffleBlockData();
  }