in fluss-server/src/main/java/com/alibaba/fluss/server/log/LocalLog.java [470:549]
LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
checkIfMemoryMappedBufferClosed();
long newOffset = Math.max(expectedNextOffset.orElse(0L), getLocalLogEndOffset());
File logFile = FlussPaths.logFile(logTabletDir, newOffset);
LogSegment activeSegment = segments.activeSegment();
if (segments.contains(newOffset)) {
// segment with the same base offset already exists and loaded
if (activeSegment.getBaseOffset() == newOffset && activeSegment.getSizeInBytes() == 0) {
// We have seen this happen (see KAFKA-6388) after shouldRoll() returns
// true for an active segment of size zero because one of the indexes is
// "full" (due to _maxEntries == 0).
LOG.warn(
"Trying to roll a new log segment with start offset "
+ newOffset
+ " =max(provided offset = "
+ expectedNextOffset
+ ", LEO = "
+ getLocalLogEndOffset()
+ ") while it already exists and is active with size 0."
+ ", size of offset index: "
+ activeSegment.offsetIndex().entries()
+ ".");
LogSegment newSegment =
createAndDeleteSegment(
newOffset, activeSegment, SegmentDeletionReason.LOG_ROLL);
updateLogEndOffset(getLocalLogEndOffset());
LOG.info("Rolled new log segment at offset " + newOffset);
return newSegment;
} else {
throw new FlussRuntimeException(
"Trying to roll a new log segment for table bucket "
+ tableBucket
+ " with start offset "
+ newOffset
+ " =max(provided offset = "
+ expectedNextOffset
+ ", LEO = "
+ getLocalLogEndOffset()
+ ") while it already exists. Existing segment is "
+ segments.get(newOffset)
+ ".");
}
} else if (!segments.isEmpty() && newOffset < activeSegment.getBaseOffset()) {
throw new FlussRuntimeException(
"Trying to roll a new log segment for table bucket "
+ tableBucket
+ " with start offset "
+ newOffset
+ " =max(provided offset = "
+ expectedNextOffset
+ ", LEO = "
+ getLocalLogEndOffset()
+ ") lower than start offset of the active segment "
+ activeSegment
+ ".");
} else {
File offsetIdxFile = FlussPaths.offsetIndexFile(logTabletDir, newOffset);
File timeIndexFile = FlussPaths.timeIndexFile(logTabletDir, newOffset);
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIndexFile)) {
if (file.exists()) {
LOG.warn(
"Newly rolled segment file "
+ file.getAbsolutePath()
+ " already exists; deleting it first");
Files.delete(file.toPath());
}
}
LogSegment logSegment = segments.lastSegment().get();
logSegment.onBecomeInactiveSegment();
}
LogSegment newSegment = LogSegment.open(logTabletDir, newOffset, config, logFormat);
segments.add(newSegment);
// We need to update the segment base offset and append position data of the
// metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(getLocalLogEndOffset());
LOG.info("Rolled new log segment at offset " + newOffset);
return newSegment;
}