public BufferingCommitLogReader()

in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [117:188]


    public BufferingCommitLogReader(@NotNull CommitLog log,
                                    @Nullable TokenRange tokenRange,
                                    @NotNull CommitLogMarkers markers,
                                    int partitionId,
                                    @NotNull ICdcStats stats,
                                    @Nullable AsyncExecutor executor,
                                    @Nullable Consumer<Marker> listener,
                                    @Nullable Long startTimestampMicros,
                                    boolean readHeader)
    {
        this.log = log;
        this.tokenRange = tokenRange;
        this.statusTracker = new ReadStatusTracker(ALL_MUTATIONS, false);
        this.checksum = new CRC32();
        this.buffer = new byte[CdcRandomAccessReader.DEFAULT_BUFFER_SIZE];
        this.reader = BufferingCommitLogReader.reader(log);
        this.markers = markers;
        this.listener = listener;
        this.startTimestampMicros = startTimestampMicros;

        Pair<Integer, Long> pair = CommitLog.extractVersionAndSegmentId(log)
                                            .orElseThrow(() -> new IllegalStateException("Could not extract segmentId from CommitLog filename"));
        this.messagingVersion = pair.getLeft();
        this.segmentId = pair.getRight();
        this.logger = new LoggerHelper(LoggerFactory.getLogger(BufferingCommitLogReader.class),
                                       "instance", log.instance().nodeName(),
                                       "dc", log.instance().dataCenter(),
                                       "log", log.name(),
                                       "size", log.maxOffset(),
                                       "segmentId", this.segmentId,
                                       "partitionId", partitionId);

        Marker startMarker = markers.startMarker(log);
        this.startMarker = startMarker.segmentId() == segmentId ? startMarker : log.zeroMarker();
        this.stats = stats;
        this.executor = executor;

        logger.trace("Opening BufferingCommitLogReader");
        try
        {
            if (readHeader || this.startMarker.position() == 0)
            {
                this.readHeader();
                if (skip(this.startMarker))
                {
                    // if we can skip this CommitLog, close immediately
                    logger.trace("Skipping commit log after reading header");
                    stats.skippedCommitLogsCount(1);
                    skipped = true;
                    return;
                }
            }
            else if (shouldSkipSegmentId(this.startMarker))
            {
                logger.trace("Skipping log");
                stats.skippedCommitLogsCount(1);
                skipped = true;
                return;
            }
            read();
        }
        catch (Throwable t)
        {
            skipped = true;
            if (isNotFoundError(t))
            {
                return;
            }
            logger.warn("Exception reading CommitLog", t);
            throw new RuntimeException(t);
        }
    }