public MicroBatchIterator()

in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/MicroBatchIterator.java [87:142]


    public MicroBatchIterator(CdcBridge cdcBridge,
                              int partitionId,
                              @Nullable TokenRange tokenRange,
                              CdcState startState,
                              CassandraSource cassandraSource,
                              Supplier<Set<String>> keyspaceSupplier,
                              CdcOptions cdcOptions,
                              AsyncExecutor asyncExecutor,
                              CommitLogProvider commitLogProvider,
                              ICdcStats stats) throws NotEnoughReplicasException
    {
        stats.watermarkerSize(startState.size());
        this.tokenRange = tokenRange;
        Map<CassandraInstance, List<CommitLog>> logs = commitLogProvider.logs(this.tokenRange)
                                                                        .collect(Collectors.groupingBy(CommitLog::instance, Collectors.toList()));

        // if insufficient replicas for any keyspace, then skip entirely otherwise we end up reading
        // all mutations (e.g. at RF=1) into the CDC state and storing until another replica comes back up.
        // This could cause state to grow indefinitely, it is better to not proceed and resume from CommitLog offset when enough replicas come back up.
        for (String keyspace : keyspaceSupplier.get())
        {
            int minReplicas = cdcOptions.minimumReplicas(keyspace);
            if (logs.size() < minReplicas)
            {
                //NOTE: this can happen when there are no writes and no commit logs to read
                LOGGER.debug("Insufficient replicas available keyspace={} requiredReplicas={} availableReplicas={}",
                             keyspace, minReplicas, logs.size());
                throw new NotEnoughReplicasException(cdcOptions.consistencyLevel(),
                                                     tokenRange == null ? null : tokenRange.lowerEndpoint(),
                                                     tokenRange == null ? null : tokenRange.upperEndpoint(),
                                                     minReplicas,
                                                     logs.size(),
                                                     cdcOptions.dc());
            }
        }

        this.builder = new CdcScannerBuilder(cdcBridge,
                                             partitionId,
                                             cdcOptions,
                                             stats,
                                             tokenRange,
                                             startState,
                                             asyncExecutor,
                                             false,
                                             logs,
                                             cassandraSource);
        this.scanner = builder.build();

        if (LOGGER.isTraceEnabled())
        {
            CdcState endState = this.scanner.endState();
            endState.markers.values()
                            .forEach(marker -> LOGGER.trace("Next epoch marker epoch={} instance={} segmentId={} position={} partitionId={}",
                                                            endState.epoch, marker.instance().nodeName(), marker.segmentId(), marker.position(), partitionId));
        }
    }