static boolean filter()

in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java [424:466]


    static boolean filter(List<PartitionUpdateWrapper> updates,
                          int minimumReplicasPerMutation,
                          Watermarker watermarker,
                          Stats stats)
    {
        if (updates.isEmpty())
        {
            throw new IllegalStateException("Should not received empty list of updates");
        }

        PartitionUpdateWrapper update = updates.get(0);
        PartitionUpdate partitionUpdate = update.partitionUpdate();
        int numReplicas = updates.size() + watermarker.replicaCount(update);

        if (numReplicas < minimumReplicasPerMutation)
        {
            // Insufficient replica copies to publish, so record replica count and handle on subsequent round
            LOGGER.warn("Ignore the partition update (partition key: '{}') for this batch "
                      + "due to insufficient replicas received. {} required {} received.",
                        partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown",
                        minimumReplicasPerMutation, numReplicas);
            watermarker.recordReplicaCount(update, numReplicas);
            stats.insufficientReplicas(update, updates.size(), minimumReplicasPerMutation);
            return false;
        }

        // Sufficient Replica Copies to Publish

        if (updates.stream().anyMatch(watermarker::seenBefore))
        {
            // Mutation previously marked as late, now we have sufficient replica copies to publish,
            // so clear watermark and publish now
            LOGGER.info("Achieved consistency level for late partition update (partition key: '{}'). {} received.",
                        partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown", numReplicas);
            watermarker.untrackReplicaCount(update);
            stats.lateMutationPublished(update);
            return true;
        }

        // We haven't seen this mutation before and achieved CL, so publish
        stats.publishedMutation(update);
        return true;
    }