public CdcScannerBuilder()

in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/scanner/CdcScannerBuilder.java [83:132]


    public CdcScannerBuilder(CdcBridge cdcBridge,
                             int partitionId,
                             CdcOptions cdcOptions,
                             ICdcStats stats,
                             @Nullable TokenRange tokenRange,
                             @NotNull CdcState startState,
                             @NotNull AsyncExecutor executor,
                             boolean readCommitLogHeader,
                             @NotNull Map<CassandraInstance, List<CommitLog>> logs,
                             CassandraSource cassandraSource)
    {
        this.cdcBridge = cdcBridge;
        this.cdcOptions = cdcOptions;
        this.stats = stats;
        this.tokenRange = tokenRange;
        this.startState = startState;
        this.executor = executor;
        this.readCommitLogHeader = readCommitLogHeader;
        this.startTimeNanos = System.nanoTime();
        this.cassandraSource = cassandraSource;
        this.partitionId = partitionId;
        this.startTimestampMicroseconds = cdcOptions.minimumTimestampMicros();

        LOGGER.debug("Opening CdcScanner " +
                     "numInstances={} startTimestampMicroseconds={} maxCommitLogsPerInstance={} partitionId={} samplingRate={} maxCdcState={}",
                     logs.size(),
                     startTimestampMicroseconds,
                     cdcOptions.maxCommitLogsPerInstance(),
                     partitionId,
                     cdcOptions.samplingRate(),
                     cdcOptions.maxCdcStateSize()
        );

        if (LOGGER.isTraceEnabled())
        {
            logs.values()
                .stream()
                .flatMap(Collection::stream)
                .forEach(log -> LOGGER.trace("Opening CdcScanner to read log instance={} log={} len={} partitionId={} maxOffset={}",
                                             log.instance().nodeName(), log.name(), log.length(), partitionId, log.maxOffset()));
        }

        this.futures = logs.entrySet().stream()
                           .collect(Collectors.toMap(
                                    Map.Entry::getKey,
                                    entry -> openInstance(entry.getValue(),
                                                          cdcOptions.maxCommitLogsPerInstance(), startState.markers,
                                                          executor))
                           );
    }