in tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/PerfClient.java [236:355]
private void runBenchmarkTasks() throws Exception {
StorageClientSettings settings = StorageClientSettings.newBuilder()
.serviceUri(serviceURI.getUri().toString())
.build();
try (StorageClient client = StorageClientBuilder.newBuilder()
.withSettings(settings)
.withNamespace(flags.namespace)
.build()) {
try (Table<ByteBuf, ByteBuf> table = result(client.openTable(flags.tableName))) {
long randSeed = System.currentTimeMillis();
KeyGenerator generator = new KeyGenerator(flags.numKeys, flags.keysPerPrefix, flags.prefixSize);
RateLimiter limiter;
if (flags.rate <= 0) {
limiter = null;
} else {
limiter = RateLimiter.create(flags.rate);
}
for (String benchmark : flags.benchmarks) {
List<BenchmarkTask> tasks = new ArrayList<>();
int currentTaskId = 0;
Semaphore semaphore;
if (flags.maxOutstandingRequests <= 0) {
semaphore = null;
} else {
semaphore = new Semaphore(flags.maxOutstandingRequests);
}
switch (benchmark) {
case "fillseq":
tasks.add(new WriteSequentialTask(
table,
currentTaskId++,
randSeed,
Math.max(flags.numOps, flags.numKeys),
flags.numKeys,
flags,
generator,
limiter,
semaphore
));
break;
case "fillrandom":
tasks.add(new WriteRandomTask(
table,
currentTaskId++,
randSeed,
Math.max(flags.numOps, flags.numKeys),
flags.numKeys,
flags,
generator,
limiter,
semaphore
));
break;
case "incseq":
tasks.add(new IncrementSequentialTask(
table,
currentTaskId++,
randSeed,
Math.max(flags.numOps, flags.numKeys),
flags.numKeys,
flags,
generator,
limiter,
semaphore
));
break;
case "incrandom":
tasks.add(new IncrementRandomTask(
table,
currentTaskId++,
randSeed,
Math.max(flags.numOps, flags.numKeys),
flags.numKeys,
flags,
generator,
limiter,
semaphore
));
break;
default:
System.err.println("Unknown benchmark: " + benchmark);
break;
}
if (tasks.isEmpty()) {
continue;
}
final CountDownLatch latch = new CountDownLatch(tasks.size());
@Cleanup("shutdown")
ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
for (BenchmarkTask task : tasks) {
executor.submit(() -> {
try {
task.runTask();
} catch (Exception e) {
log.error("Encountered issue at running benchmark task {}",
task.tid, e);
} finally {
latch.countDown();
}
});
}
@Cleanup("shutdown")
ExecutorService statsExecutor = Executors.newSingleThreadExecutor();
statsExecutor.submit(() -> reportStats(tasks));
latch.await();
log.info("------------------- DONE -----------------------");
tasks.forEach(task -> task.printAggregatedStats());
}
}
}
}