public SSTablesSupplier sstables()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java [248:309]


    public SSTablesSupplier sstables(int partitionId,
                                     @Nullable SparkRangeFilter sparkRangeFilter,
                                     @NotNull List<PartitionKeyFilter> partitionKeyFilters)
    {
        // Get token range for Spark partition
        TokenPartitioner tokenPartitioner = tokenPartitioner();
        if (partitionId < 0 || partitionId >= tokenPartitioner.numPartitions())
        {
            throw new IllegalStateException("PartitionId outside expected range: " + partitionId);
        }

        // Get all replicas overlapping partition token range
        Range<BigInteger> range = tokenPartitioner.getTokenRange(partitionId);
        CassandraRing ring = ring();
        ReplicationFactor replicationFactor = ring.replicationFactor();
        validateReplicationFactor(replicationFactor);
        Map<Range<BigInteger>, List<CassandraInstance>> instRanges;
        Map<Range<BigInteger>, List<CassandraInstance>> subRanges = ring().getSubRanges(range).asMapOfRanges();
        if (partitionKeyFilters.isEmpty())
        {
            instRanges = subRanges;
        }
        else
        {
            instRanges = new HashMap<>();
            subRanges.keySet().forEach(instRange -> {
                TokenRange tokenRange = RangeUtils.toTokenRange(instRange);
                if (partitionKeyFilters.stream().anyMatch(filter -> filter.overlaps(tokenRange)))
                {
                    instRanges.putIfAbsent(instRange, subRanges.get(instRange));
                }
            });
        }

        Set<CassandraInstance> replicas = PartitionedDataLayer.rangesToReplicas(consistencyLevel, datacenter, instRanges);
        LOGGER.info("Creating partitioned SSTablesSupplier for Spark partition partitionId={} rangeLower={} rangeUpper={} numReplicas={}",
                    partitionId, range.lowerEndpoint(), range.upperEndpoint(), replicas.size());

        // Use consistency level and replication factor to calculate min number of replicas required
        // to satisfy consistency level; split replicas into 'primary' and 'backup' replicas,
        // attempt on primary replicas and use backups to retry in the event of a failure
        int minReplicas = consistencyLevel.blockFor(replicationFactor, datacenter);
        ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas(
                consistencyLevel, datacenter, instRanges, replicas, this::getAvailability, minReplicas, partitionId);
        if (replicaSet.primary().size() < minReplicas)
        {
            // Could not find enough primary replicas to meet consistency level
            assert replicaSet.backup().isEmpty();
            throw new NotEnoughReplicasException(consistencyLevel, range, minReplicas, replicas.size(), datacenter);
        }

        ExecutorService executor = executorService();
        Stats stats = stats();
        Set<SingleReplica> primaryReplicas = replicaSet.primary().stream()
                .map(instance -> new SingleReplica(instance, this, range, partitionId, executor, stats, replicaSet.isRepairPrimary(instance)))
                .collect(Collectors.toSet());
        Set<SingleReplica> backupReplicas = replicaSet.backup().stream()
                .map(instance -> new SingleReplica(instance, this, range, partitionId, executor, stats, true))
                .collect(Collectors.toSet());

        return new MultipleReplicas(primaryReplicas, backupReplicas, stats);
    }