LogSegment roll()

in fluss-server/src/main/java/com/alibaba/fluss/server/log/LocalLog.java [470:549]


    LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
        checkIfMemoryMappedBufferClosed();
        long newOffset = Math.max(expectedNextOffset.orElse(0L), getLocalLogEndOffset());
        File logFile = FlussPaths.logFile(logTabletDir, newOffset);
        LogSegment activeSegment = segments.activeSegment();
        if (segments.contains(newOffset)) {
            // segment with the same base offset already exists and loaded
            if (activeSegment.getBaseOffset() == newOffset && activeSegment.getSizeInBytes() == 0) {
                // We have seen this happen (see KAFKA-6388) after shouldRoll() returns
                // true for an active segment of size zero because one of the indexes is
                // "full" (due to _maxEntries == 0).
                LOG.warn(
                        "Trying to roll a new log segment with start offset "
                                + newOffset
                                + " =max(provided offset = "
                                + expectedNextOffset
                                + ", LEO = "
                                + getLocalLogEndOffset()
                                + ") while it already exists and is active with size 0."
                                + ", size of offset index: "
                                + activeSegment.offsetIndex().entries()
                                + ".");
                LogSegment newSegment =
                        createAndDeleteSegment(
                                newOffset, activeSegment, SegmentDeletionReason.LOG_ROLL);
                updateLogEndOffset(getLocalLogEndOffset());
                LOG.info("Rolled new log segment at offset " + newOffset);
                return newSegment;
            } else {
                throw new FlussRuntimeException(
                        "Trying to roll a new log segment for table bucket "
                                + tableBucket
                                + " with start offset "
                                + newOffset
                                + " =max(provided offset = "
                                + expectedNextOffset
                                + ", LEO = "
                                + getLocalLogEndOffset()
                                + ") while it already exists. Existing segment is "
                                + segments.get(newOffset)
                                + ".");
            }
        } else if (!segments.isEmpty() && newOffset < activeSegment.getBaseOffset()) {
            throw new FlussRuntimeException(
                    "Trying to roll a new log segment for table bucket "
                            + tableBucket
                            + " with start offset "
                            + newOffset
                            + " =max(provided offset = "
                            + expectedNextOffset
                            + ", LEO = "
                            + getLocalLogEndOffset()
                            + ") lower than start offset of the active segment "
                            + activeSegment
                            + ".");
        } else {
            File offsetIdxFile = FlussPaths.offsetIndexFile(logTabletDir, newOffset);
            File timeIndexFile = FlussPaths.timeIndexFile(logTabletDir, newOffset);
            for (File file : Arrays.asList(logFile, offsetIdxFile, timeIndexFile)) {
                if (file.exists()) {
                    LOG.warn(
                            "Newly rolled segment file "
                                    + file.getAbsolutePath()
                                    + " already exists; deleting it first");
                    Files.delete(file.toPath());
                }
            }
            LogSegment logSegment = segments.lastSegment().get();
            logSegment.onBecomeInactiveSegment();
        }

        LogSegment newSegment = LogSegment.open(logTabletDir, newOffset, config, logFormat);
        segments.add(newSegment);
        // We need to update the segment base offset and append position data of the
        // metadata when log rolls.
        // The next offset should not change.
        updateLogEndOffset(getLocalLogEndOffset());
        LOG.info("Rolled new log segment at offset " + newOffset);
        return newSegment;
    }