in harry-core/src/harry/model/sut/injvm/InJVMTokenAwareVisitExecutor.java [82:115]
private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
{
if (sut.isShutdown())
throw new IllegalStateException("System under test is shut down");
if (retries > this.MAX_RETRIES)
throw new IllegalStateException(String.format("Can not execute statement %s after %d retries", statement, retries));
Object[] pk = schema.inflatePartitionKey(pd);
List<TokenPlacementModel.Node> replicas = getReplicas(getRing(), TokenUtil.token(ByteUtils.compose(ByteUtils.objectsToBytes(pk))));
TokenPlacementModel.Node replica = replicas.get((int) (lts % replicas.size()));
if (cl == SystemUnderTest.ConsistencyLevel.NODE_LOCAL)
{
future.complete(executeNodeLocal(statement.cql(), replica, statement.bindings()));
}
else
{
CompletableFuture.supplyAsync(() -> sut.cluster
.stream()
.filter((n) -> n.config().broadcastAddress().toString().contains(replica.id))
.findFirst()
.get()
.coordinator()
.execute(statement.cql(), InJvmSut.toApiCl(cl), statement.bindings()), executor)
.whenComplete((res, t) ->
{
if (t != null)
executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
else
future.complete(res);
});
}
}