in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/MicroBatchIterator.java [87:142]
public MicroBatchIterator(CdcBridge cdcBridge,
int partitionId,
@Nullable TokenRange tokenRange,
CdcState startState,
CassandraSource cassandraSource,
Supplier<Set<String>> keyspaceSupplier,
CdcOptions cdcOptions,
AsyncExecutor asyncExecutor,
CommitLogProvider commitLogProvider,
ICdcStats stats) throws NotEnoughReplicasException
{
stats.watermarkerSize(startState.size());
this.tokenRange = tokenRange;
Map<CassandraInstance, List<CommitLog>> logs = commitLogProvider.logs(this.tokenRange)
.collect(Collectors.groupingBy(CommitLog::instance, Collectors.toList()));
// if insufficient replicas for any keyspace, then skip entirely otherwise we end up reading
// all mutations (e.g. at RF=1) into the CDC state and storing until another replica comes back up.
// This could cause state to grow indefinitely, it is better to not proceed and resume from CommitLog offset when enough replicas come back up.
for (String keyspace : keyspaceSupplier.get())
{
int minReplicas = cdcOptions.minimumReplicas(keyspace);
if (logs.size() < minReplicas)
{
//NOTE: this can happen when there are no writes and no commit logs to read
LOGGER.debug("Insufficient replicas available keyspace={} requiredReplicas={} availableReplicas={}",
keyspace, minReplicas, logs.size());
throw new NotEnoughReplicasException(cdcOptions.consistencyLevel(),
tokenRange == null ? null : tokenRange.lowerEndpoint(),
tokenRange == null ? null : tokenRange.upperEndpoint(),
minReplicas,
logs.size(),
cdcOptions.dc());
}
}
this.builder = new CdcScannerBuilder(cdcBridge,
partitionId,
cdcOptions,
stats,
tokenRange,
startState,
asyncExecutor,
false,
logs,
cassandraSource);
this.scanner = builder.build();
if (LOGGER.isTraceEnabled())
{
CdcState endState = this.scanner.endState();
endState.markers.values()
.forEach(marker -> LOGGER.trace("Next epoch marker epoch={} instance={} segmentId={} position={} partitionId={}",
endState.epoch, marker.instance().nodeName(), marker.segmentId(), marker.position(), partitionId));
}
}