in fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java [1120:1223]
static void rebuildWriterState(
WriterStateManager writerStateManager,
LogSegments segments,
long logStartOffset,
long lastOffset,
boolean reloadFromCleanShutdown)
throws IOException {
List<Optional<Long>> offsetsToSnapshot = new ArrayList<>();
if (!segments.isEmpty()) {
long lastSegmentBaseOffset = segments.lastSegment().get().getBaseOffset();
Optional<Long> nextLatestSegmentBaseOffset =
segments.lowerSegment(lastSegmentBaseOffset).map(LogSegment::getBaseOffset);
offsetsToSnapshot.add(nextLatestSegmentBaseOffset);
offsetsToSnapshot.add(Optional.of(lastSegmentBaseOffset));
offsetsToSnapshot.add(Optional.of(lastOffset));
} else {
offsetsToSnapshot.add(Optional.of(lastOffset));
}
LOG.info("Loading writer state till offset {}", lastOffset);
// We want to avoid unnecessary scanning of the log to build the writer state when the
// tablet server is being upgraded. The basic idea is to use the absence of writer
// snapshot files to detect the upgrade case, but we have to be careful not to assume too
// much in the presence of tablet server failures. The most common upgrade cases in
// which we expect to find no snapshots are the following:
//
// 1. The tablet server has been upgraded, the table is on the new message format, and we
// had a clean shutdown.
//
// If we hit either of these cases, we skip writer state loading and write a new
// snapshot at the log end offset (see below). The next time the log is reloaded, we will
// load writer state using this snapshot (or later snapshots). Otherwise, if there is
// no snapshot file, then we have to rebuild writer state from the first segment.
if (!writerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) {
// To avoid an expensive scan through all the segments, we take empty snapshots from
// the start of the last two segments and the last offset. This should avoid the full
// scan in the case that the log needs truncation.
for (Optional<Long> offset : offsetsToSnapshot) {
if (offset.isPresent()) {
writerStateManager.updateMapEndOffset(offset.get());
writerStateManager.takeSnapshot();
}
}
} else {
LOG.info(
"Reloading from writer snapshot and rebuilding writer state from offset {}",
lastOffset);
boolean isEmptyBeforeTruncation =
writerStateManager.isEmpty() && writerStateManager.mapEndOffset() >= lastOffset;
long writerStateLoadStart = System.currentTimeMillis();
writerStateManager.truncateAndReload(
logStartOffset, lastOffset, System.currentTimeMillis());
long segmentRecoveryStart = System.currentTimeMillis();
// Only do the potentially expensive reloading if the last snapshot offset is lower than
// the log end offset (which would be the case on first startup) and there were active
// writers prior to truncation (which could be the case if truncating after initial
// loading). If there weren't, then truncating shouldn't change that fact (although it
// could cause a writer id to expire earlier than expected), and we can skip the
// loading. This is an optimization for users which are not yet using idempotent
// features yet.
if (lastOffset > writerStateManager.mapEndOffset() && !isEmptyBeforeTruncation) {
Optional<LogSegment> segmentOfLastOffset = segments.floorSegment(lastOffset);
List<LogSegment> segmentsList =
segments.values(writerStateManager.mapEndOffset(), lastOffset);
for (LogSegment segment : segmentsList) {
long startOffset =
Math.max(
Math.max(
segment.getBaseOffset(),
writerStateManager.mapEndOffset()),
logStartOffset);
writerStateManager.updateMapEndOffset(startOffset);
if (offsetsToSnapshot.contains(Optional.of(segment.getBaseOffset()))) {
writerStateManager.takeSnapshot();
}
int maxPosition = segment.getSizeInBytes();
if (segmentOfLastOffset.isPresent() && segmentOfLastOffset.get() == segment) {
FileLogRecords.LogOffsetPosition logOffsetPosition =
segment.translateOffset(lastOffset);
if (logOffsetPosition != null) {
maxPosition = logOffsetPosition.position;
}
}
FetchDataInfo fetchDataInfo =
segment.read(startOffset, Integer.MAX_VALUE, maxPosition, false);
if (fetchDataInfo != null) {
loadWritersFromRecords(writerStateManager, fetchDataInfo.getRecords());
}
}
}
writerStateManager.updateMapEndOffset(lastOffset);
writerStateManager.takeSnapshot();
LOG.info(
"Writer state recovery took {} ms for snapshot load and {} ms for segment recovery from offset {}",
segmentRecoveryStart - writerStateLoadStart,
System.currentTimeMillis() - segmentRecoveryStart,
lastOffset);
}
}