in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [117:188]
public BufferingCommitLogReader(@NotNull CommitLog log,
@Nullable TokenRange tokenRange,
@NotNull CommitLogMarkers markers,
int partitionId,
@NotNull ICdcStats stats,
@Nullable AsyncExecutor executor,
@Nullable Consumer<Marker> listener,
@Nullable Long startTimestampMicros,
boolean readHeader)
{
this.log = log;
this.tokenRange = tokenRange;
this.statusTracker = new ReadStatusTracker(ALL_MUTATIONS, false);
this.checksum = new CRC32();
this.buffer = new byte[CdcRandomAccessReader.DEFAULT_BUFFER_SIZE];
this.reader = BufferingCommitLogReader.reader(log);
this.markers = markers;
this.listener = listener;
this.startTimestampMicros = startTimestampMicros;
Pair<Integer, Long> pair = CommitLog.extractVersionAndSegmentId(log)
.orElseThrow(() -> new IllegalStateException("Could not extract segmentId from CommitLog filename"));
this.messagingVersion = pair.getLeft();
this.segmentId = pair.getRight();
this.logger = new LoggerHelper(LoggerFactory.getLogger(BufferingCommitLogReader.class),
"instance", log.instance().nodeName(),
"dc", log.instance().dataCenter(),
"log", log.name(),
"size", log.maxOffset(),
"segmentId", this.segmentId,
"partitionId", partitionId);
Marker startMarker = markers.startMarker(log);
this.startMarker = startMarker.segmentId() == segmentId ? startMarker : log.zeroMarker();
this.stats = stats;
this.executor = executor;
logger.trace("Opening BufferingCommitLogReader");
try
{
if (readHeader || this.startMarker.position() == 0)
{
this.readHeader();
if (skip(this.startMarker))
{
// if we can skip this CommitLog, close immediately
logger.trace("Skipping commit log after reading header");
stats.skippedCommitLogsCount(1);
skipped = true;
return;
}
}
else if (shouldSkipSegmentId(this.startMarker))
{
logger.trace("Skipping log");
stats.skippedCommitLogsCount(1);
skipped = true;
return;
}
read();
}
catch (Throwable t)
{
skipped = true;
if (isNotFoundError(t))
{
return;
}
logger.warn("Exception reading CommitLog", t);
throw new RuntimeException(t);
}
}