in gremlin-client-demo/src/main/java/software/amazon/neptune/RetryDemo.java [88:219]
public void run() {
try {
ClusterEndpointsRefreshAgent refreshAgent = createRefreshAgent();
RetryConfig retryConfig = new RetryConfigBuilder()
.retryOnCustomExceptionLogic(new Function<Exception, Boolean>() {
@Override
public Boolean apply(Exception e) {
RetryUtils.Result result = RetryUtils.isRetryableException(e);
logger.info("isRetriableException: {}", result);
return result.isRetryable();
}
})
.withExponentialBackoff()
.withMaxNumberOfTries(5)
.withDelayBetweenTries(1, ChronoUnit.SECONDS)
.build();
ClusterContext readerContext = createClusterContext(retryConfig, refreshAgent, EndpointsType.ReadReplicas);
ClusterContext writerContext = createClusterContext(retryConfig, refreshAgent, EndpointsType.Primary);
// Use same GraphTraversalSources across threads
GraphTraversalSource gReader = readerContext.graphTraversalSource();
GraphTraversalSource gWriter = writerContext.graphTraversalSource();
logger.info("Starting queries...");
AtomicInteger currentQueryCount = new AtomicInteger(0);
ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
taskExecutor.submit(new Runnable() {
@Override
public void run() {
try {
int count = 0;
int readCount = 0;
int writeCount = 0;
int tries = 0;
int failedReads = 0;
int failedWrites = 0;
CallExecutor executor = new CallExecutorBuilder()
.config(retryConfig)
.build();
while (count < queryCount) {
count = currentQueryCount.incrementAndGet();
if (count % 7 == 0) {
// write
writeCount++;
Callable<Edge> query = () ->
gWriter.addV("Thing").as("v1").
addV("Thing").as("v2").
addE("Connection").from("v1").to("v2").
next();
try {
Status<Edge> status = executor.execute(query);
tries += status.getTotalTries();
} catch (RetriesExhaustedException e) {
failedWrites++;
}
} else {
// read
readCount++;
Callable<List<Map<Object, Object>>> query = () ->
gReader.V().limit(10).valueMap(true).toList();
try {
Status<List<Map<Object, Object>>> status = executor.execute(query);
tries += status.getTotalTries();
List<Map<Object, Object>> results = status.getResult();
for (Map<Object, Object> result : results) {
//Do nothing
}
} catch (RetriesExhaustedException e) {
failedReads++;
}
}
logger.info("Progress: [queries: {}, tries: {}, reads: {}, writes: {}, failedReads: {}, failedWrites: {}]",
(readCount + writeCount),
tries,
readCount,
writeCount,
failedReads,
failedWrites);
}
} catch (Exception e) {
logger.error("Unexpected error", e);
}
}
});
}
taskExecutor.shutdown();
try {
if (!taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
logger.warn("Timeout expired with uncompleted tasks");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
logger.info("Closing...");
refreshAgent.close();
readerContext.close();
writerContext.close();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}