in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java [424:466]
static boolean filter(List<PartitionUpdateWrapper> updates,
int minimumReplicasPerMutation,
Watermarker watermarker,
Stats stats)
{
if (updates.isEmpty())
{
throw new IllegalStateException("Should not received empty list of updates");
}
PartitionUpdateWrapper update = updates.get(0);
PartitionUpdate partitionUpdate = update.partitionUpdate();
int numReplicas = updates.size() + watermarker.replicaCount(update);
if (numReplicas < minimumReplicasPerMutation)
{
// Insufficient replica copies to publish, so record replica count and handle on subsequent round
LOGGER.warn("Ignore the partition update (partition key: '{}') for this batch "
+ "due to insufficient replicas received. {} required {} received.",
partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown",
minimumReplicasPerMutation, numReplicas);
watermarker.recordReplicaCount(update, numReplicas);
stats.insufficientReplicas(update, updates.size(), minimumReplicasPerMutation);
return false;
}
// Sufficient Replica Copies to Publish
if (updates.stream().anyMatch(watermarker::seenBefore))
{
// Mutation previously marked as late, now we have sufficient replica copies to publish,
// so clear watermark and publish now
LOGGER.info("Achieved consistency level for late partition update (partition key: '{}'). {} received.",
partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown", numReplicas);
watermarker.untrackReplicaCount(update);
stats.lateMutationPublished(update);
return true;
}
// We haven't seen this mutation before and achieved CL, so publish
stats.publishedMutation(update);
return true;
}