in storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java [58:118]
public static List<DataFileSegment> mergeSegments(
String path, List<FileBasedShuffleSegment> segments, int readBufferSize) {
List<DataFileSegment> dataFileSegments = Lists.newArrayList();
if (segments != null && !segments.isEmpty()) {
if (segments.size() == 1) {
List<BufferSegment> bufferSegments = Lists.newArrayList();
bufferSegments.add(
new BufferSegment(
segments.get(0).getBlockId(),
0,
segments.get(0).getLength(),
segments.get(0).getUncompressLength(),
segments.get(0).getCrc(),
segments.get(0).getTaskAttemptId()));
dataFileSegments.add(
new DataFileSegment(
path, segments.get(0).getOffset(), segments.get(0).getLength(), bufferSegments));
} else {
Collections.sort(segments);
long start = -1;
long latestPosition = -1;
long skipThreshold = readBufferSize / 2;
long lastPosition = Long.MAX_VALUE;
List<BufferSegment> bufferSegments = Lists.newArrayList();
for (FileBasedShuffleSegment segment : segments) {
// check if there has expected skip range, eg, [20, 100], [1000, 1001] and the skip range
// is [101, 999]
if (start > -1 && segment.getOffset() - lastPosition > skipThreshold) {
dataFileSegments.add(
new DataFileSegment(path, start, (int) (lastPosition - start), bufferSegments));
start = -1;
}
// previous FileBasedShuffleSegment are merged, start new merge process
if (start == -1) {
bufferSegments = Lists.newArrayList();
start = segment.getOffset();
}
latestPosition = segment.getOffset() + segment.getLength();
bufferSegments.add(
new BufferSegment(
segment.getBlockId(),
segment.getOffset() - start,
segment.getLength(),
segment.getUncompressLength(),
segment.getCrc(),
segment.getTaskAttemptId()));
if (latestPosition - start >= readBufferSize) {
dataFileSegments.add(
new DataFileSegment(path, start, (int) (latestPosition - start), bufferSegments));
start = -1;
}
lastPosition = latestPosition;
}
if (start > -1) {
dataFileSegments.add(
new DataFileSegment(path, start, (int) (lastPosition - start), bufferSegments));
}
}
}
return dataFileSegments;
}