in common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java [42:102]
public static List<Long> getChunkOffsetsFromShuffleBlockInfos(
int startMapIndex,
int endMapIndex,
long fetchChunkSize,
Map<Integer, List<ShuffleBlockInfo>> indexMap,
boolean isInMemory) {
List<Long> sortedChunkOffset = new ArrayList<>();
ShuffleBlockInfo lastBlock = null;
int maxMapIndex = endMapIndex;
if (endMapIndex == Integer.MAX_VALUE) {
// not a range read
maxMapIndex = indexMap.keySet().stream().max(Integer::compareTo).get() + 1;
}
if (isInMemory) {
long currentChunkOffset = 0;
long lastChunkOffset = 0;
// This sorted chunk offsets are used for fetch handler.
// Sorted byte buf is a new composite byte buf containing the required data.
// It will not reuse the old buffer of memory file, so the offset starts from 0.
sortedChunkOffset.add(0l);
for (int i = startMapIndex; i < maxMapIndex; i++) {
List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
if (blockInfos != null) {
for (ShuffleBlockInfo info : blockInfos) {
currentChunkOffset += info.length;
if (currentChunkOffset - lastChunkOffset > fetchChunkSize) {
lastChunkOffset = currentChunkOffset;
sortedChunkOffset.add(currentChunkOffset);
}
}
}
}
if (lastChunkOffset != currentChunkOffset) {
sortedChunkOffset.add(currentChunkOffset);
}
} else {
for (int i = startMapIndex; i < maxMapIndex; i++) {
List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
if (blockInfos != null) {
for (ShuffleBlockInfo info : blockInfos) {
if (sortedChunkOffset.size() == 0) {
sortedChunkOffset.add(info.offset);
}
if (info.offset - sortedChunkOffset.get(sortedChunkOffset.size() - 1)
>= fetchChunkSize) {
sortedChunkOffset.add(info.offset);
}
lastBlock = info;
}
}
}
if (lastBlock != null) {
long endChunkOffset = lastBlock.length + lastBlock.offset;
if (!sortedChunkOffset.contains(endChunkOffset)) {
sortedChunkOffset.add(endChunkOffset);
}
}
}
return sortedChunkOffset;
}