public CdcScannerBuilder()

in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java [95:140]


    public CdcScannerBuilder(int partitionId,
                             TableMetadata table,
                             Partitioner partitioner,
                             CommitLogProvider commitLogs,
                             Stats stats,
                             @Nullable SparkRangeFilter sparkRangeFilter,
                             @Nullable CdcOffsetFilter offsetFilter,
                             int minimumReplicasPerMutation,
                             @NotNull Watermarker jobWatermarker,
                             @NotNull String jobId,
                             @NotNull ExecutorService executorService,
                             @NotNull TimeProvider timeProvider)
    {
        this.table = table;
        this.partitioner = partitioner;
        this.stats = stats;
        this.sparkRangeFilter = sparkRangeFilter;
        this.offsetFilter = offsetFilter;
        this.watermarker = jobWatermarker.instance(jobId);
        Preconditions.checkArgument(minimumReplicasPerMutation >= 1,
                                    "minimumReplicasPerMutation should be at least 1");
        this.minimumReplicasPerMutation = minimumReplicasPerMutation;
        this.startTimeNanos = System.nanoTime();
        this.timeProvider = timeProvider;

        Map<CassandraInstance, List<CommitLog>> logs = commitLogs
                .logs()
                .collect(Collectors.groupingBy(CommitLog::instance, Collectors.toList()));
        Map<CassandraInstance, CommitLog.Marker> markers = logs.keySet().stream()
                .map(watermarker::highWaterMark)
                .filter(Objects::nonNull)
                .collect(Collectors.toMap(CommitLog.Marker::instance, Function.identity()));

        this.partitionId = partitionId;
        LOGGER.info("Opening CdcScanner numInstances={} start={} maxAgeMicros={} partitionId={} listLogsTimeNanos={}",
                    logs.size(),
                    offsetFilter != null ? offsetFilter.start().getTimestampMicros() : null,
                    offsetFilter != null ? offsetFilter.maxAgeMicros() : null,
                    partitionId,
                    System.nanoTime() - startTimeNanos);

        this.futures = logs.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, entry -> openInstanceAsync(entry.getValue(),
                                                                                        markers.get(entry.getKey()),
                                                                                        executorService)));
    }