in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java [248:309]
public SSTablesSupplier sstables(int partitionId,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters)
{
// Get token range for Spark partition
TokenPartitioner tokenPartitioner = tokenPartitioner();
if (partitionId < 0 || partitionId >= tokenPartitioner.numPartitions())
{
throw new IllegalStateException("PartitionId outside expected range: " + partitionId);
}
// Get all replicas overlapping partition token range
Range<BigInteger> range = tokenPartitioner.getTokenRange(partitionId);
CassandraRing ring = ring();
ReplicationFactor replicationFactor = ring.replicationFactor();
validateReplicationFactor(replicationFactor);
Map<Range<BigInteger>, List<CassandraInstance>> instRanges;
Map<Range<BigInteger>, List<CassandraInstance>> subRanges = ring().getSubRanges(range).asMapOfRanges();
if (partitionKeyFilters.isEmpty())
{
instRanges = subRanges;
}
else
{
instRanges = new HashMap<>();
subRanges.keySet().forEach(instRange -> {
TokenRange tokenRange = RangeUtils.toTokenRange(instRange);
if (partitionKeyFilters.stream().anyMatch(filter -> filter.overlaps(tokenRange)))
{
instRanges.putIfAbsent(instRange, subRanges.get(instRange));
}
});
}
Set<CassandraInstance> replicas = PartitionedDataLayer.rangesToReplicas(consistencyLevel, datacenter, instRanges);
LOGGER.info("Creating partitioned SSTablesSupplier for Spark partition partitionId={} rangeLower={} rangeUpper={} numReplicas={}",
partitionId, range.lowerEndpoint(), range.upperEndpoint(), replicas.size());
// Use consistency level and replication factor to calculate min number of replicas required
// to satisfy consistency level; split replicas into 'primary' and 'backup' replicas,
// attempt on primary replicas and use backups to retry in the event of a failure
int minReplicas = consistencyLevel.blockFor(replicationFactor, datacenter);
ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas(
consistencyLevel, datacenter, instRanges, replicas, this::getAvailability, minReplicas, partitionId);
if (replicaSet.primary().size() < minReplicas)
{
// Could not find enough primary replicas to meet consistency level
assert replicaSet.backup().isEmpty();
throw new NotEnoughReplicasException(consistencyLevel, range, minReplicas, replicas.size(), datacenter);
}
ExecutorService executor = executorService();
Stats stats = stats();
Set<SingleReplica> primaryReplicas = replicaSet.primary().stream()
.map(instance -> new SingleReplica(instance, this, range, partitionId, executor, stats, replicaSet.isRepairPrimary(instance)))
.collect(Collectors.toSet());
Set<SingleReplica> backupReplicas = replicaSet.backup().stream()
.map(instance -> new SingleReplica(instance, this, range, partitionId, executor, stats, true))
.collect(Collectors.toSet());
return new MultipleReplicas(primaryReplicas, backupReplicas, stats);
}