in common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java [62:162]
public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
return Lists.newArrayList();
}
ByteBuffer indexData = shuffleIndexResult.getIndexData();
long dataFileLen = shuffleIndexResult.getDataFileLen();
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
int bufferOffset = 0;
long fileOffset = -1;
long totalLen = 0;
long lastExpectedBlockIndex = -1;
List<Long> indexTaskIds = new ArrayList<>();
/**
* One ShuffleDataSegment should meet following requirements:
*
* <p>1. taskId in [startMapIndex, endMapIndex) taskIds bitmap. Attention: the index in the
* range is not the map task id, which means the required task ids are not continuous. 2.
* ShuffleDataSegment size should < readBufferSize 3. Single shuffleDataSegment's blocks should
* be continuous
*/
int index = 0;
while (indexData.hasRemaining()) {
try {
long offset = indexData.getLong();
int length = indexData.getInt();
int uncompressLength = indexData.getInt();
long crc = indexData.getLong();
long blockId = indexData.getLong();
long taskAttemptId = indexData.getLong();
totalLen += length;
indexTaskIds.add(taskAttemptId);
// If ShuffleServer is flushing the file at this time, the length in the index file record
// may be greater
// than the length in the actual data file, and it needs to be returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLen > dataFileLen) {
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
+ "This may happen when the data is flushing, please ignore.",
totalLen,
dataFileLen,
Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
break;
}
boolean conditionOfDiscontinuousBlocks =
lastExpectedBlockIndex != -1
&& bufferSegments.size() > 0
&& expectTaskIds.contains(taskAttemptId)
&& index - lastExpectedBlockIndex != 1;
boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;
if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
bufferSegments = Lists.newArrayList();
bufferOffset = 0;
fileOffset = -1;
}
if (expectTaskIds.contains(taskAttemptId)) {
if (fileOffset == -1) {
fileOffset = offset;
}
bufferSegments.add(
new BufferSegment(
blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
bufferOffset += length;
lastExpectedBlockIndex = index;
}
index++;
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
}
}
if (bufferOffset > 0) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Index file task-ids sequence: {}, expected task-ids: {}",
indexTaskIds,
getExpectedTaskIds(expectTaskIds));
}
return dataFileSegments;
}