in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/scanner/CdcScannerBuilder.java [83:132]
public CdcScannerBuilder(CdcBridge cdcBridge,
int partitionId,
CdcOptions cdcOptions,
ICdcStats stats,
@Nullable TokenRange tokenRange,
@NotNull CdcState startState,
@NotNull AsyncExecutor executor,
boolean readCommitLogHeader,
@NotNull Map<CassandraInstance, List<CommitLog>> logs,
CassandraSource cassandraSource)
{
this.cdcBridge = cdcBridge;
this.cdcOptions = cdcOptions;
this.stats = stats;
this.tokenRange = tokenRange;
this.startState = startState;
this.executor = executor;
this.readCommitLogHeader = readCommitLogHeader;
this.startTimeNanos = System.nanoTime();
this.cassandraSource = cassandraSource;
this.partitionId = partitionId;
this.startTimestampMicroseconds = cdcOptions.minimumTimestampMicros();
LOGGER.debug("Opening CdcScanner " +
"numInstances={} startTimestampMicroseconds={} maxCommitLogsPerInstance={} partitionId={} samplingRate={} maxCdcState={}",
logs.size(),
startTimestampMicroseconds,
cdcOptions.maxCommitLogsPerInstance(),
partitionId,
cdcOptions.samplingRate(),
cdcOptions.maxCdcStateSize()
);
if (LOGGER.isTraceEnabled())
{
logs.values()
.stream()
.flatMap(Collection::stream)
.forEach(log -> LOGGER.trace("Opening CdcScanner to read log instance={} log={} len={} partitionId={} maxOffset={}",
log.instance().nodeName(), log.name(), log.length(), partitionId, log.maxOffset()));
}
this.futures = logs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> openInstance(entry.getValue(),
cdcOptions.maxCommitLogsPerInstance(), startState.markers,
executor))
);
}