in client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java [254:339]
public CompressedShuffleBlock readShuffleBlockData() {
// empty data expected, just return null
if (blockIdBitmap.isEmpty()) {
return null;
}
// All blocks are processed, so just return
if (pendingBlockIds.isEmpty()) {
return null;
}
// if client need request new data from shuffle server
if (bufferSegmentQueue.isEmpty()) {
if (read() <= 0) {
return null;
}
}
// get next buffer segment
BufferSegment bs = null;
// blocks in bufferSegmentQueue may be from different partition in range partition mode,
// or may be from speculation task, filter them and just read the necessary block
while (true) {
bs = bufferSegmentQueue.poll();
if (bs == null) {
break;
}
// check 1: if blockId is processed
// check 2: if blockId is required for current partition
// check 3: if blockId is generated by required task
if (!processedBlockIds.contains(bs.getBlockId())
&& blockIdBitmap.contains(bs.getBlockId())
&& taskIdBitmap.contains(bs.getTaskAttemptId())) {
long expectedCrc = -1;
long actualCrc = -1;
try {
long start = System.currentTimeMillis();
expectedCrc = bs.getCrc();
actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
crcCheckTime.addAndGet(System.currentTimeMillis() - start);
} catch (Exception e) {
LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
}
if (expectedCrc != actualCrc) {
String errMsg =
"Unexpected crc value for blockId["
+ bs.getBlockId()
+ "], expected:"
+ expectedCrc
+ ", actual:"
+ actualCrc;
// If some blocks of one replica are corrupted,but maybe other replicas are not corrupted,
// so exception should not be thrown here if blocks have multiple replicas
if (shuffleServerInfoList.size() > 1) {
LOG.warn(errMsg);
clientReadHandler.updateConsumedBlockInfo(bs, true);
continue;
} else {
throw new RssFetchFailedException(errMsg);
}
}
// mark block as processed
processedBlockIds.addLong(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
// only update the statistics of necessary blocks
clientReadHandler.updateConsumedBlockInfo(bs, false);
break;
}
clientReadHandler.updateConsumedBlockInfo(bs, true);
// mark block as processed
processedBlockIds.addLong(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
}
if (bs != null) {
ByteBuffer compressedBuffer = readBuffer.duplicate();
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
}