in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java [109:152]
private <T extends SparkSSTableReader> void openReplicaOrRetry(
@NotNull SingleReplica replica,
@NotNull ReaderOpener<T> readerOpener,
@NotNull Set<T> result,
@NotNull AtomicInteger count,
@NotNull CountDownLatch latch,
@NotNull ConcurrentLinkedQueue<SingleReplica> otherReplicas)
{
replica.openReplicaAsync(readerOpener)
.whenComplete((readers, throwable) -> {
if (throwable != null)
{
LOGGER.warn("Failed to open SSTableReaders for replica node={} token={} dataCenter={}",
replica.instance().nodeName(), replica.instance().token(), replica.instance().dataCenter(), throwable);
stats.failedToOpenReplica(replica, throwable);
SingleReplica anotherReplica = otherReplicas.poll();
if (anotherReplica != null)
{
LOGGER.warn("Retrying on another replica node={} token={} dataCenter={}",
anotherReplica.instance().nodeName(), anotherReplica.instance().token(), anotherReplica.instance().dataCenter());
// If the failed replica was the repair primary we need the backup replacement replica to be the new repair primary
anotherReplica.setIsRepairPrimary(replica.isRepairPrimary());
openReplicaOrRetry(anotherReplica, readerOpener, result, count, latch, otherReplicas);
}
else
// No more replicas to retry so end
{
latch.countDown();
}
return;
}
try
{
// Successfully opened all SSTable readers
result.addAll(readers);
count.incrementAndGet();
}
finally
{
latch.countDown();
}
});
}