private void runBenchmarkTasks()

in tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/PerfClient.java [236:355]


    private void runBenchmarkTasks() throws Exception {
        StorageClientSettings settings = StorageClientSettings.newBuilder()
            .serviceUri(serviceURI.getUri().toString())
            .build();
        try (StorageClient client = StorageClientBuilder.newBuilder()
             .withSettings(settings)
             .withNamespace(flags.namespace)
             .build()) {
            try (Table<ByteBuf, ByteBuf> table = result(client.openTable(flags.tableName))) {

                long randSeed = System.currentTimeMillis();
                KeyGenerator generator = new KeyGenerator(flags.numKeys, flags.keysPerPrefix, flags.prefixSize);
                RateLimiter limiter;
                if (flags.rate <= 0) {
                    limiter = null;
                } else {
                    limiter = RateLimiter.create(flags.rate);
                }

                for (String benchmark : flags.benchmarks) {
                    List<BenchmarkTask> tasks = new ArrayList<>();
                    int currentTaskId = 0;
                    Semaphore semaphore;
                    if (flags.maxOutstandingRequests <= 0) {
                        semaphore = null;
                    } else {
                        semaphore = new Semaphore(flags.maxOutstandingRequests);
                    }

                    switch (benchmark) {
                        case "fillseq":
                            tasks.add(new WriteSequentialTask(
                                table,
                                currentTaskId++,
                                randSeed,
                                Math.max(flags.numOps, flags.numKeys),
                                flags.numKeys,
                                flags,
                                generator,
                                limiter,
                                semaphore
                            ));
                            break;
                        case "fillrandom":
                            tasks.add(new WriteRandomTask(
                                table,
                                currentTaskId++,
                                randSeed,
                                Math.max(flags.numOps, flags.numKeys),
                                flags.numKeys,
                                flags,
                                generator,
                                limiter,
                                semaphore
                            ));
                            break;
                        case "incseq":
                            tasks.add(new IncrementSequentialTask(
                                table,
                                currentTaskId++,
                                randSeed,
                                Math.max(flags.numOps, flags.numKeys),
                                flags.numKeys,
                                flags,
                                generator,
                                limiter,
                                semaphore
                            ));
                            break;
                        case "incrandom":
                            tasks.add(new IncrementRandomTask(
                                table,
                                currentTaskId++,
                                randSeed,
                                Math.max(flags.numOps, flags.numKeys),
                                flags.numKeys,
                                flags,
                                generator,
                                limiter,
                                semaphore
                            ));
                            break;
                        default:
                            System.err.println("Unknown benchmark: " + benchmark);
                            break;
                    }

                    if (tasks.isEmpty()) {
                        continue;
                    }

                    final CountDownLatch latch = new CountDownLatch(tasks.size());
                    @Cleanup("shutdown")
                    ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
                    for (BenchmarkTask task : tasks) {
                        executor.submit(() -> {
                            try {
                                task.runTask();
                            } catch (Exception e) {
                                log.error("Encountered issue at running benchmark task {}",
                                    task.tid, e);
                            } finally {
                                latch.countDown();
                            }

                        });
                    }

                    @Cleanup("shutdown")
                    ExecutorService statsExecutor = Executors.newSingleThreadExecutor();
                    statsExecutor.submit(() -> reportStats(tasks));

                    latch.await();

                    log.info("------------------- DONE -----------------------");
                    tasks.forEach(task -> task.printAggregatedStats());
                }
            }
        }
    }