in server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java [177:257]
protected void updateBufferSegmentsAndResultBlocks(
long lastBlockId,
long readBufferSize,
List<BufferSegment> bufferSegments,
List<ShufflePartitionedBlock> resultBlocks,
Roaring64NavigableMap expectedTaskIds) {
long nextBlockId = lastBlockId;
List<Long> eventIdList = Lists.newArrayList(inFlushBlockMap.keySet());
List<Long> sortedEventId = sortFlushingEventId(eventIdList);
int offset = 0;
boolean hasLastBlockId = false;
// read from inFlushBlockMap first to make sure the order of
// data read is according to the order of data received
// The number of events means how many batches are in flushing status,
// it should be less than 5, or there has some problem with storage
if (!inFlushBlockMap.isEmpty()) {
for (Long eventId : sortedEventId) {
// update bufferSegments with different strategy according to lastBlockId
if (nextBlockId == Constants.INVALID_BLOCK_ID) {
updateSegmentsWithoutBlockId(
offset,
inFlushBlockMap.get(eventId),
readBufferSize,
bufferSegments,
resultBlocks,
expectedTaskIds);
hasLastBlockId = true;
} else {
hasLastBlockId =
updateSegmentsWithBlockId(
offset,
inFlushBlockMap.get(eventId),
readBufferSize,
nextBlockId,
bufferSegments,
resultBlocks,
expectedTaskIds);
// if last blockId is found, read from begin with next cached blocks
if (hasLastBlockId) {
// reset blockId to read from begin in next cached blocks
nextBlockId = Constants.INVALID_BLOCK_ID;
}
}
if (!bufferSegments.isEmpty()) {
offset = calculateDataLength(bufferSegments);
}
if (offset >= readBufferSize) {
break;
}
}
}
// try to read from cached blocks which is not in flush queue
if (blocks.size() > 0 && offset < readBufferSize) {
if (nextBlockId == Constants.INVALID_BLOCK_ID) {
updateSegmentsWithoutBlockId(
offset, blocks, readBufferSize, bufferSegments, resultBlocks, expectedTaskIds);
hasLastBlockId = true;
} else {
hasLastBlockId =
updateSegmentsWithBlockId(
offset,
blocks,
readBufferSize,
nextBlockId,
bufferSegments,
resultBlocks,
expectedTaskIds);
}
}
if ((!inFlushBlockMap.isEmpty() || blocks.size() > 0) && offset == 0 && !hasLastBlockId) {
// can't find lastBlockId, it should be flushed
// but there still has data in memory
// try read again with blockId = Constants.INVALID_BLOCK_ID
updateBufferSegmentsAndResultBlocks(
Constants.INVALID_BLOCK_ID,
readBufferSize,
bufferSegments,
resultBlocks,
expectedTaskIds);
}
}