public void run()

in gremlin-client-demo/src/main/java/software/amazon/neptune/RetryDemo.java [88:219]


    public void run() {


        try {

            ClusterEndpointsRefreshAgent refreshAgent = createRefreshAgent();

            RetryConfig retryConfig = new RetryConfigBuilder()
                    .retryOnCustomExceptionLogic(new Function<Exception, Boolean>() {
                        @Override
                        public Boolean apply(Exception e) {
                            RetryUtils.Result result = RetryUtils.isRetryableException(e);
                            logger.info("isRetriableException: {}", result);
                            return result.isRetryable();
                        }
                    })
                    .withExponentialBackoff()
                    .withMaxNumberOfTries(5)
                    .withDelayBetweenTries(1, ChronoUnit.SECONDS)
                    .build();

            ClusterContext readerContext = createClusterContext(retryConfig, refreshAgent, EndpointsType.ReadReplicas);
            ClusterContext writerContext = createClusterContext(retryConfig, refreshAgent, EndpointsType.Primary);

            // Use same GraphTraversalSources across threads
            GraphTraversalSource gReader = readerContext.graphTraversalSource();
            GraphTraversalSource gWriter = writerContext.graphTraversalSource();

            logger.info("Starting queries...");

            AtomicInteger currentQueryCount = new AtomicInteger(0);

            ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

            for (int i = 0; i < 5; i++) {
                taskExecutor.submit(new Runnable() {
                    @Override
                    public void run() {

                        try {
                            int count = 0;
                            int readCount = 0;
                            int writeCount = 0;
                            int tries = 0;
                            int failedReads = 0;
                            int failedWrites = 0;

                            CallExecutor executor = new CallExecutorBuilder()
                                    .config(retryConfig)
                                    .build();

                            while (count < queryCount) {

                                count = currentQueryCount.incrementAndGet();

                                if (count % 7 == 0) {
                                    // write
                                    writeCount++;

                                    Callable<Edge> query = () ->
                                            gWriter.addV("Thing").as("v1").
                                                    addV("Thing").as("v2").
                                                    addE("Connection").from("v1").to("v2").
                                                    next();

                                    try {
                                        Status<Edge> status = executor.execute(query);
                                        tries += status.getTotalTries();
                                    } catch (RetriesExhaustedException e) {
                                        failedWrites++;
                                    }


                                } else {
                                    // read
                                    readCount++;

                                    Callable<List<Map<Object, Object>>> query = () ->
                                            gReader.V().limit(10).valueMap(true).toList();


                                    try {
                                        Status<List<Map<Object, Object>>> status = executor.execute(query);
                                        tries += status.getTotalTries();

                                        List<Map<Object, Object>> results = status.getResult();

                                        for (Map<Object, Object> result : results) {
                                            //Do nothing
                                        }
                                    } catch (RetriesExhaustedException e) {
                                        failedReads++;
                                    }
                                }
                                logger.info("Progress: [queries: {}, tries: {}, reads: {}, writes: {}, failedReads: {}, failedWrites: {}]",
                                        (readCount + writeCount),
                                        tries,
                                        readCount,
                                        writeCount,
                                        failedReads,
                                        failedWrites);
                            }
                        } catch (Exception e) {
                            logger.error("Unexpected error", e);
                        }
                    }
                });
            }


            taskExecutor.shutdown();

            try {
                if (!taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                    logger.warn("Timeout expired with uncompleted tasks");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }

            logger.info("Closing...");

            refreshAgent.close();
            readerContext.close();
            writerContext.close();

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }