in common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java [43:136]
protected List<ShuffleDataSegment> splitCommon(
ShuffleIndexResult shuffleIndexResult, Predicate<Long> taskFilter) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
return Lists.newArrayList();
}
ByteBuffer indexData = shuffleIndexResult.getIndexData();
long dataFileLen = shuffleIndexResult.getDataFileLen();
int[] storageIds = shuffleIndexResult.getStorageIds();
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
int bufferOffset = 0;
long fileOffset = -1;
long totalLength = 0;
int storageIndex = 0;
long preOffset = -1;
int preStorageId = -1;
int currentStorageId = 0;
while (indexData.hasRemaining()) {
try {
final long offset = indexData.getLong();
final int length = indexData.getInt();
final int uncompressLength = indexData.getInt();
final long crc = indexData.getLong();
final long blockId = indexData.getLong();
final long taskAttemptId = indexData.getLong();
if (storageIds.length == 0) {
currentStorageId = -1;
} else if (preOffset > offset) {
storageIndex++;
if (storageIndex >= storageIds.length) {
LOGGER.warn("storageIds length {} is not enough.", storageIds.length);
}
currentStorageId = storageIds[storageIndex];
} else {
currentStorageId = storageIds[storageIndex];
}
preOffset = offset;
totalLength += length;
if (dataFileLen != -1 && totalLength > dataFileLen) {
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Block id: {}"
+ "This may happen when the data is flushing, please ignore.",
totalLength,
dataFileLen,
blockId);
break;
}
boolean storageChanged = preStorageId != -1 && currentStorageId != preStorageId;
if (bufferOffset >= readBufferSize
|| storageChanged
|| (taskFilter != null && !taskFilter.test(taskAttemptId))) {
if (bufferOffset > 0) {
ShuffleDataSegment sds =
new ShuffleDataSegment(fileOffset, bufferOffset, preStorageId, bufferSegments);
dataFileSegments.add(sds);
bufferSegments = Lists.newArrayList();
bufferOffset = 0;
fileOffset = -1;
}
}
if (taskFilter == null || taskFilter.test(taskAttemptId)) {
if (fileOffset == -1) {
fileOffset = offset;
}
bufferSegments.add(
new BufferSegment(
blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
preStorageId = currentStorageId;
bufferOffset += length;
}
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
}
}
if (bufferOffset > 0) {
ShuffleDataSegment sds =
new ShuffleDataSegment(fileOffset, bufferOffset, currentStorageId, bufferSegments);
dataFileSegments.add(sds);
}
return dataFileSegments;
}