in cassandra-four-zero/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [102:150]
public BufferingCommitLogReader(@NotNull TableMetadata table,
@Nullable CdcOffsetFilter offsetFilter,
@NotNull org.apache.cassandra.spark.cdc.CommitLog log,
@Nullable SparkRangeFilter sparkRangeFilter,
@Nullable org.apache.cassandra.spark.cdc.CommitLog.Marker highWaterMark,
int partitionId)
{
this.table = table;
this.offsetFilter = offsetFilter;
this.log = log;
this.updates = new ArrayList<>();
this.sparkRangeFilter = sparkRangeFilter;
this.statusTracker = new ReadStatusTracker(ALL_MUTATIONS, false);
this.checksum = new CRC32();
this.buffer = new byte[CdcRandomAccessReader.DEFAULT_BUFFER_SIZE];
this.reader = BufferingCommitLogReader.reader(log);
this.logger = new LoggerHelper(LoggerFactory.getLogger(BufferingCommitLogReader.class),
"instance", log.instance().nodeName(),
"dc", log.instance().dataCenter(),
"log", log.name(),
"size", log.maxOffset(),
"partitionId", partitionId);
this.highWaterMark = highWaterMark != null ? highWaterMark : log.zeroMarker();
try
{
readHeader();
if (skip())
{
// If we can skip this CommitLog, close immediately
close();
}
else
{
read();
}
}
catch (Throwable throwable)
{
close();
if (isNotFoundError(throwable))
{
return;
}
logger.warn("Exception reading CommitLog", throwable);
throw new RuntimeException(throwable);
}
}