in bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java [159:300]
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
+ " Cannot be used with -listen");
//How to generate ledger node path.
options.addOption("ledgerManagerType", true, "The ledger manager type. "
+ "The optional value: flat, hierarchical, legacyHierarchical, longHierarchical. Default: flat");
options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
options.addOption("useV2", false, "Whether use V2 protocol to read ledgers from the bookie server.");
options.addOption("help", false, "This message");
options.addOption("batchentries", true, "The batch read entries count. "
+ "If the value is greater than 0, uses batch read. Or uses the single read. Default 1000");
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("help")) {
usage(options);
System.exit(-1);
}
final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes(UTF_8);
final int sockTimeout = Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
final int batchentries = Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
LOG.error("Cannot used -ledger and -listen together");
usage(options);
System.exit(-1);
}
final AtomicInteger ledger = new AtomicInteger(0);
final AtomicInteger numLedgers = new AtomicInteger(0);
if (cmd.hasOption("ledger")) {
ledger.set(Integer.parseInt(cmd.getOptionValue("ledger")));
} else if (cmd.hasOption("listen")) {
numLedgers.set(Integer.parseInt(cmd.getOptionValue("listen")));
} else {
LOG.error("You must use -ledger or -listen");
usage(options);
System.exit(-1);
}
final CountDownLatch shutdownLatch = new CountDownLatch(1);
String ledgerManagerType = cmd.getOptionValue("ledgerManagerType", "flat");
String nodepath;
if ("flat".equals(ledgerManagerType)) {
nodepath = String.format("/ledgers/L%010d", ledger.get());
} else if ("hierarchical".equals(ledgerManagerType)) {
nodepath = String.format("/ledgers%s", StringUtils.getHybridHierarchicalLedgerPath(ledger.get()));
} else if ("legacyHierarchical".equals(ledgerManagerType)) {
nodepath = String.format("/ledgers%s", StringUtils.getShortHierarchicalLedgerPath(ledger.get()));
} else if ("longHierarchical".equals(ledgerManagerType)) {
nodepath = String.format("/ledgers%s", StringUtils.getLongHierarchicalLedgerPath(ledger.get()));
} else {
LOG.warn("Unknown ledger manager type: {}, use flat as the value", ledgerManagerType);
nodepath = String.format("/ledgers/L%010d", ledger.get());
}
final ClientConfiguration conf = new ClientConfiguration();
conf.setReadTimeout(sockTimeout).setZkServers(servers);
if (cmd.hasOption("useV2")) {
conf.setUseV2WireProtocol(true);
}
try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
.connectString(servers)
.sessionTimeoutMs(3000)
.build()) {
final Set<String> processedLedgers = new HashSet<String>();
zk.register(new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeCreated
&& event.getPath().equals(nodepath)) {
readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
if (numLedgers.get() < 0) {
return;
}
List<String> children = zk.getChildren("/ledgers", true);
List<String> ledgers = new ArrayList<String>();
for (String child : children) {
if (LEDGER_PATTERN.matcher(child).find()) {
ledgers.add(child);
}
}
for (String ledger : ledgers) {
synchronized (processedLedgers) {
if (processedLedgers.contains(ledger)) {
continue;
}
final Matcher m = LEDGER_PATTERN.matcher(ledger);
if (m.find()) {
int ledgersLeft = numLedgers.decrementAndGet();
final Long ledgerId = Long.valueOf(m.group(1));
processedLedgers.add(ledger);
Thread t = new Thread() {
@Override
public void run() {
readLedger(conf, ledgerId, passwd, batchentries);
}
};
t.start();
if (ledgersLeft <= 0) {
shutdownLatch.countDown();
}
} else {
LOG.error("Cant file ledger id in {}", ledger);
}
}
}
} else {
LOG.warn("Unknown event {}", event);
}
} catch (Exception e) {
LOG.error("Exception in watcher", e);
}
}
});
if (ledger.get() != 0) {
if (zk.exists(nodepath, true) != null) {
readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else {
LOG.info("Watching for creation of" + nodepath);
}
} else {
zk.getChildren("/ledgers", true);
}
shutdownLatch.await();
LOG.info("Shutting down");
}
}