public static void main()

in bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java [159:300]


    public static void main(String[] args) throws Exception {
        Options options = new Options();
        options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. "
                          + " Cannot be used with -listen");
        //How to generate ledger node path.
        options.addOption("ledgerManagerType", true, "The ledger manager type. "
                + "The optional value: flat, hierarchical, legacyHierarchical, longHierarchical. Default: flat");
        options.addOption("listen", true, "Listen for creation of <arg> ledgers, and read each one fully");
        options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')");
        options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
        options.addOption("sockettimeout", true, "Socket timeout for bookkeeper client. In seconds. Default 5");
        options.addOption("useV2", false, "Whether use V2 protocol to read ledgers from the bookie server.");
        options.addOption("help", false, "This message");
        options.addOption("batchentries", true, "The batch read entries count. "
                + "If the value is greater than 0, uses batch read. Or uses the single read. Default 1000");

        CommandLineParser parser = new PosixParser();
        CommandLine cmd = parser.parse(options, args);

        if (cmd.hasOption("help")) {
            usage(options);
            System.exit(-1);
        }

        final String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
        final byte[] passwd = cmd.getOptionValue("password", "benchPasswd").getBytes(UTF_8);
        final int sockTimeout = Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
        final int batchentries = Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
        if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
            LOG.error("Cannot used -ledger and -listen together");
            usage(options);
            System.exit(-1);
        }

        final AtomicInteger ledger = new AtomicInteger(0);
        final AtomicInteger numLedgers = new AtomicInteger(0);
        if (cmd.hasOption("ledger")) {
            ledger.set(Integer.parseInt(cmd.getOptionValue("ledger")));
        } else if (cmd.hasOption("listen")) {
            numLedgers.set(Integer.parseInt(cmd.getOptionValue("listen")));
        } else {
            LOG.error("You must use -ledger or -listen");
            usage(options);
            System.exit(-1);
        }

        final CountDownLatch shutdownLatch = new CountDownLatch(1);

        String ledgerManagerType = cmd.getOptionValue("ledgerManagerType", "flat");
        String nodepath;
        if ("flat".equals(ledgerManagerType)) {
            nodepath = String.format("/ledgers/L%010d", ledger.get());
        } else if ("hierarchical".equals(ledgerManagerType)) {
            nodepath = String.format("/ledgers%s", StringUtils.getHybridHierarchicalLedgerPath(ledger.get()));
        } else if ("legacyHierarchical".equals(ledgerManagerType)) {
            nodepath = String.format("/ledgers%s", StringUtils.getShortHierarchicalLedgerPath(ledger.get()));
        } else if ("longHierarchical".equals(ledgerManagerType)) {
            nodepath = String.format("/ledgers%s", StringUtils.getLongHierarchicalLedgerPath(ledger.get()));
        } else {
            LOG.warn("Unknown ledger manager type: {}, use flat as the value", ledgerManagerType);
            nodepath = String.format("/ledgers/L%010d", ledger.get());
        }

        final ClientConfiguration conf = new ClientConfiguration();
        conf.setReadTimeout(sockTimeout).setZkServers(servers);

        if (cmd.hasOption("useV2")) {
            conf.setUseV2WireProtocol(true);
        }

        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                .connectString(servers)
                .sessionTimeoutMs(3000)
                .build()) {
            final Set<String> processedLedgers = new HashSet<String>();
            zk.register(new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        try {
                            if (event.getType() == Event.EventType.NodeCreated
                                       && event.getPath().equals(nodepath)) {
                                readLedger(conf, ledger.get(), passwd, batchentries);
                                shutdownLatch.countDown();
                            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                                if (numLedgers.get() < 0) {
                                    return;
                                }
                                List<String> children = zk.getChildren("/ledgers", true);
                                List<String> ledgers = new ArrayList<String>();
                                for (String child : children) {
                                    if (LEDGER_PATTERN.matcher(child).find()) {
                                        ledgers.add(child);
                                    }
                                }
                                for (String ledger : ledgers) {
                                    synchronized (processedLedgers) {
                                        if (processedLedgers.contains(ledger)) {
                                            continue;
                                        }
                                        final Matcher m = LEDGER_PATTERN.matcher(ledger);
                                        if (m.find()) {
                                            int ledgersLeft = numLedgers.decrementAndGet();
                                            final Long ledgerId = Long.valueOf(m.group(1));
                                            processedLedgers.add(ledger);
                                            Thread t = new Thread() {
                                                @Override
                                                public void run() {
                                                    readLedger(conf, ledgerId, passwd, batchentries);
                                                }
                                            };
                                            t.start();
                                            if (ledgersLeft <= 0) {
                                                shutdownLatch.countDown();
                                            }
                                        } else {
                                            LOG.error("Cant file ledger id in {}", ledger);
                                        }
                                    }
                                }
                            } else {
                                LOG.warn("Unknown event {}", event);
                            }
                        } catch (Exception e) {
                            LOG.error("Exception in watcher", e);
                        }
                    }
                });

            if (ledger.get() != 0) {
                if (zk.exists(nodepath, true) != null) {
                    readLedger(conf, ledger.get(), passwd, batchentries);
                    shutdownLatch.countDown();
                } else {
                    LOG.info("Watching for creation of" + nodepath);
                }
            } else {
                zk.getChildren("/ledgers", true);
            }
            shutdownLatch.await();
            LOG.info("Shutting down");
        }
    }