static void rebuildWriterState()

in fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java [1120:1223]


    static void rebuildWriterState(
            WriterStateManager writerStateManager,
            LogSegments segments,
            long logStartOffset,
            long lastOffset,
            boolean reloadFromCleanShutdown)
            throws IOException {
        List<Optional<Long>> offsetsToSnapshot = new ArrayList<>();
        if (!segments.isEmpty()) {
            long lastSegmentBaseOffset = segments.lastSegment().get().getBaseOffset();
            Optional<Long> nextLatestSegmentBaseOffset =
                    segments.lowerSegment(lastSegmentBaseOffset).map(LogSegment::getBaseOffset);
            offsetsToSnapshot.add(nextLatestSegmentBaseOffset);
            offsetsToSnapshot.add(Optional.of(lastSegmentBaseOffset));
            offsetsToSnapshot.add(Optional.of(lastOffset));
        } else {
            offsetsToSnapshot.add(Optional.of(lastOffset));
        }
        LOG.info("Loading writer state till offset {}", lastOffset);
        // We want to avoid unnecessary scanning of the log to build the writer state when the
        // tablet server is being upgraded. The basic idea is to use the absence of writer
        // snapshot files to detect the upgrade case, but we have to be careful not to assume too
        // much in the presence of tablet server failures. The most common upgrade cases in
        // which we expect to find no snapshots are the following:
        //
        // 1. The tablet server has been upgraded, the table is on the new message format, and we
        // had a clean shutdown.
        //
        // If we hit either of these cases, we skip writer state loading and write a new
        // snapshot at the log end offset (see below). The next time the log is reloaded, we will
        // load writer state using this snapshot (or later snapshots). Otherwise, if there is
        // no snapshot file, then we have to rebuild writer state from the first segment.
        if (!writerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) {
            // To avoid an expensive scan through all the segments, we take empty snapshots from
            // the start of the last two segments and the last offset. This should avoid the full
            // scan in the case that the log needs truncation.
            for (Optional<Long> offset : offsetsToSnapshot) {
                if (offset.isPresent()) {
                    writerStateManager.updateMapEndOffset(offset.get());
                    writerStateManager.takeSnapshot();
                }
            }
        } else {
            LOG.info(
                    "Reloading from writer snapshot and rebuilding writer state from offset {}",
                    lastOffset);
            boolean isEmptyBeforeTruncation =
                    writerStateManager.isEmpty() && writerStateManager.mapEndOffset() >= lastOffset;
            long writerStateLoadStart = System.currentTimeMillis();
            writerStateManager.truncateAndReload(
                    logStartOffset, lastOffset, System.currentTimeMillis());
            long segmentRecoveryStart = System.currentTimeMillis();

            // Only do the potentially expensive reloading if the last snapshot offset is lower than
            // the log end offset (which would be the case on first startup) and there were active
            // writers prior to truncation (which could be the case if truncating after initial
            // loading). If there weren't, then truncating shouldn't change that fact (although it
            // could cause a writer id to expire earlier than expected), and we can skip the
            // loading. This is an optimization for users which are not yet using idempotent
            // features yet.
            if (lastOffset > writerStateManager.mapEndOffset() && !isEmptyBeforeTruncation) {
                Optional<LogSegment> segmentOfLastOffset = segments.floorSegment(lastOffset);

                List<LogSegment> segmentsList =
                        segments.values(writerStateManager.mapEndOffset(), lastOffset);
                for (LogSegment segment : segmentsList) {
                    long startOffset =
                            Math.max(
                                    Math.max(
                                            segment.getBaseOffset(),
                                            writerStateManager.mapEndOffset()),
                                    logStartOffset);
                    writerStateManager.updateMapEndOffset(startOffset);

                    if (offsetsToSnapshot.contains(Optional.of(segment.getBaseOffset()))) {
                        writerStateManager.takeSnapshot();
                    }

                    int maxPosition = segment.getSizeInBytes();
                    if (segmentOfLastOffset.isPresent() && segmentOfLastOffset.get() == segment) {
                        FileLogRecords.LogOffsetPosition logOffsetPosition =
                                segment.translateOffset(lastOffset);
                        if (logOffsetPosition != null) {
                            maxPosition = logOffsetPosition.position;
                        }
                    }

                    FetchDataInfo fetchDataInfo =
                            segment.read(startOffset, Integer.MAX_VALUE, maxPosition, false);
                    if (fetchDataInfo != null) {
                        loadWritersFromRecords(writerStateManager, fetchDataInfo.getRecords());
                    }
                }
            }

            writerStateManager.updateMapEndOffset(lastOffset);
            writerStateManager.takeSnapshot();
            LOG.info(
                    "Writer state recovery took {} ms for snapshot load and {} ms for segment recovery from offset {}",
                    segmentRecoveryStart - writerStateLoadStart,
                    System.currentTimeMillis() - segmentRecoveryStart,
                    lastOffset);
        }
    }