public CompressedShuffleBlock readShuffleBlockData()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java [206:293]


  public CompressedShuffleBlock readShuffleBlockData() {
    while (true) {
      // empty data expected, just return null
      if (blockIdBitmap.isEmpty()) {
        return null;
      }

      // All blocks are processed, so just return
      if (pendingBlockIds.isEmpty()) {
        return null;
      }

      // if client need request new data from shuffle server
      if (bufferSegmentQueue.isEmpty()) {
        if (read() <= 0) {
          return null;
        }
      }

      // get next buffer segment
      BufferSegment bs = null;

      // blocks in bufferSegmentQueue may be from different partition in range partition mode,
      // or may be from speculation task, filter them and just read the necessary block
      while (true) {
        bs = bufferSegmentQueue.poll();
        if (bs == null) {
          break;
        }
        // check 1: if blockId is processed
        // check 2: if blockId is required for current partition
        // check 3: if blockId is generated by required task
        if (!processedBlockIds.contains(bs.getBlockId())
            && blockIdBitmap.contains(bs.getBlockId())
            && taskIdBitmap.contains(bs.getTaskAttemptId())) {
          long expectedCrc = -1;
          long actualCrc = -1;
          try {
            long start = System.currentTimeMillis();
            expectedCrc = bs.getCrc();
            actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
            crcCheckTime.addAndGet(System.currentTimeMillis() - start);
          } catch (Exception e) {
            LOG.warn("Can't read data for " + blockIdLayout.asBlockId(bs.getBlockId()), e);
          }

          if (expectedCrc != actualCrc) {
            String errMsg =
                "Unexpected crc value for "
                    + blockIdLayout.asBlockId(bs.getBlockId())
                    + ", expected:"
                    + expectedCrc
                    + ", actual:"
                    + actualCrc;
            // If some blocks of one replica are corrupted,but maybe other replicas are not
            // corrupted,
            // so exception should not be thrown here if blocks have multiple replicas
            if (shuffleServerInfoList.size() > 1) {
              LOG.warn(errMsg);
              clientReadHandler.updateConsumedBlockInfo(bs, true);
              continue;
            } else {
              throw new RssFetchFailedException(errMsg);
            }
          }

          // mark block as processed
          processedBlockIds.addLong(bs.getBlockId());
          pendingBlockIds.removeLong(bs.getBlockId());
          // only update the statistics of necessary blocks
          clientReadHandler.updateConsumedBlockInfo(bs, false);
          break;
        }
        clientReadHandler.updateConsumedBlockInfo(bs, true);
        // mark block as processed
        processedBlockIds.addLong(bs.getBlockId());
        pendingBlockIds.removeLong(bs.getBlockId());
      }

      if (bs != null) {
        ByteBuffer compressedBuffer = readBuffer.duplicate();
        compressedBuffer.position(bs.getOffset());
        compressedBuffer.limit(bs.getOffset() + bs.getLength());
        return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
      }
      // current segment hasn't data, try next segment
    }
  }