in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java [95:140]
public CdcScannerBuilder(int partitionId,
TableMetadata table,
Partitioner partitioner,
CommitLogProvider commitLogs,
Stats stats,
@Nullable SparkRangeFilter sparkRangeFilter,
@Nullable CdcOffsetFilter offsetFilter,
int minimumReplicasPerMutation,
@NotNull Watermarker jobWatermarker,
@NotNull String jobId,
@NotNull ExecutorService executorService,
@NotNull TimeProvider timeProvider)
{
this.table = table;
this.partitioner = partitioner;
this.stats = stats;
this.sparkRangeFilter = sparkRangeFilter;
this.offsetFilter = offsetFilter;
this.watermarker = jobWatermarker.instance(jobId);
Preconditions.checkArgument(minimumReplicasPerMutation >= 1,
"minimumReplicasPerMutation should be at least 1");
this.minimumReplicasPerMutation = minimumReplicasPerMutation;
this.startTimeNanos = System.nanoTime();
this.timeProvider = timeProvider;
Map<CassandraInstance, List<CommitLog>> logs = commitLogs
.logs()
.collect(Collectors.groupingBy(CommitLog::instance, Collectors.toList()));
Map<CassandraInstance, CommitLog.Marker> markers = logs.keySet().stream()
.map(watermarker::highWaterMark)
.filter(Objects::nonNull)
.collect(Collectors.toMap(CommitLog.Marker::instance, Function.identity()));
this.partitionId = partitionId;
LOGGER.info("Opening CdcScanner numInstances={} start={} maxAgeMicros={} partitionId={} listLogsTimeNanos={}",
logs.size(),
offsetFilter != null ? offsetFilter.start().getTimestampMicros() : null,
offsetFilter != null ? offsetFilter.maxAgeMicros() : null,
partitionId,
System.nanoTime() - startTimeNanos);
this.futures = logs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> openInstanceAsync(entry.getValue(),
markers.get(entry.getKey()),
executorService)));
}