in cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/scanner/CdcScannerBuilder.java [254:321]
public CdcStreamScanner build()
{
// block on futures to read all CommitLog mutations and collect CDC updates
List<PartitionUpdateWrapper> updates =
futures.values()
.stream()
.map(future -> FutureUtils.await(future, throwable -> LOGGER.warn("Failed to read instance with error", throwable)))
.filter(FutureUtils.FutureResult::isSuccess)
.map(FutureUtils.FutureResult::value)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.flatMap(f -> f.updates().stream())
.collect(Collectors.toList());
stats.mutationsReadPerBatch(updates.size());
// begin mutate the start state
CdcState.Mutator stateMutator = startState.mutate();
Collection<PartitionUpdateWrapper> filteredUpdates = reportTimeTaken(() -> filterValidUpdates(updates, stateMutator),
stats::mutationsFilterTime);
long now = System.currentTimeMillis();
filteredUpdates.forEach(update -> stats.changeReceived(update.keyspace(),
update.table(),
now - TimeUnit.MICROSECONDS.toMillis(update.maxTimestampMicros()))
);
if (stateMutator.isFull(cdcOptions))
{
int cdcStateSize = stateMutator.size();
// we don't want the CDC state to grow indefinately, it indicates something is wrong so fail
futures.clear();
LOGGER.error("Watermarker has exceeded max permitted size watermarkerSize={} maxCdcStateSize={}", cdcStateSize, cdcOptions.maxCdcStateSize());
stats.watermarkerExceededSize(cdcStateSize);
throw new RuntimeException("Watermark state has exceeded max permitted size: " + cdcStateSize);
}
// update CDC state with new marker positions
// only update marker if we fully read all logs on an instance without errors
futures.forEach((instance, future) -> {
if (!future.isCompletedExceptionally())
{
future.join()
.stream()
.map(CommitLogReader.Result::marker)
.max(Marker::compareTo)
.ifPresent(marker -> stateMutator.advanceMarker(instance, marker));
}
});
futures.clear();
long timeTakenToReadBatch = System.nanoTime() - startTimeNanos;
LOGGER.debug("Processed CdcScanner startTimestampMicroseconds={} partitionId={} timeNanos={} updates={}",
startTimestampMicroseconds,
partitionId,
timeTakenToReadBatch,
updates.size()
);
stats.mutationsBatchReadTime(timeTakenToReadBatch);
// hand updates over to CdcSortedStreamScanner
// build new end state, purging expired mutations to prevent the state growing indefinately
CdcState endState = stateMutator
.nextEpoch()
.withRange(tokenRange)
.purge(stats, startTimestampMicroseconds)
.build();
return buildStreamScanner(filteredUpdates, endState);
}