private void openReplicaOrRetry()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicas.java [109:152]


    private <T extends SparkSSTableReader> void openReplicaOrRetry(
            @NotNull SingleReplica replica,
            @NotNull ReaderOpener<T> readerOpener,
            @NotNull Set<T> result,
            @NotNull AtomicInteger count,
            @NotNull CountDownLatch latch,
            @NotNull ConcurrentLinkedQueue<SingleReplica> otherReplicas)
    {
        replica.openReplicaAsync(readerOpener)
               .whenComplete((readers, throwable) -> {
                   if (throwable != null)
                   {
                       LOGGER.warn("Failed to open SSTableReaders for replica node={} token={} dataCenter={}",
                                   replica.instance().nodeName(), replica.instance().token(), replica.instance().dataCenter(), throwable);
                       stats.failedToOpenReplica(replica, throwable);
                       SingleReplica anotherReplica = otherReplicas.poll();
                       if (anotherReplica != null)
                       {
                           LOGGER.warn("Retrying on another replica node={} token={} dataCenter={}",
                                       anotherReplica.instance().nodeName(), anotherReplica.instance().token(), anotherReplica.instance().dataCenter());
                           // If the failed replica was the repair primary we need the backup replacement replica to be the new repair primary
                           anotherReplica.setIsRepairPrimary(replica.isRepairPrimary());
                           openReplicaOrRetry(anotherReplica, readerOpener, result, count, latch, otherReplicas);
                       }
                       else
                       // No more replicas to retry so end
                       {
                           latch.countDown();
                       }
                       return;
                   }

                   try
                   {
                       // Successfully opened all SSTable readers
                       result.addAll(readers);
                       count.incrementAndGet();
                   }
                   finally
                   {
                       latch.countDown();
                   }
               });
    }