public static void main()

in bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java [131:244]


    public static void main(String[] args)
            throws InterruptedException, ParseException, IOException,
            BKException, KeeperException {
        Options options = new Options();
        options.addOption("host", true, "Hostname or IP of bookie to benchmark");
        options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
        options.addOption("zookeeper", true, "Zookeeper ensemble, (default \"localhost:2181\")");
        options.addOption("size", true, "Size of message to send, in bytes (default 1024)");
        options.addOption("warmupCount", true, "Number of messages in warmup phase (default 999)");
        options.addOption("latencyCount", true, "Number of messages in latency phase (default 5000)");
        options.addOption("throughputCount", true, "Number of messages in throughput phase (default 50000)");
        options.addOption("help", false, "This message");

        CommandLineParser parser = new PosixParser();
        CommandLine cmd = parser.parse(options, args);

        if (cmd.hasOption("help") || !cmd.hasOption("host")) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("BenchBookie <options>", options);
            System.exit(-1);
        }

        String addr = cmd.getOptionValue("host");
        int port = Integer.parseInt(cmd.getOptionValue("port", "3181"));
        int size = Integer.parseInt(cmd.getOptionValue("size", "1024"));
        String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
        int warmUpCount = Integer.parseInt(cmd.getOptionValue("warmupCount", "999"));
        int latencyCount = Integer.parseInt(cmd.getOptionValue("latencyCount", "5000"));
        int throughputCount = Integer.parseInt(cmd.getOptionValue("throughputCount", "50000"));

        EventLoopGroup eventLoop;
        if (SystemUtils.IS_OS_LINUX) {
            try {
                eventLoop = new EpollEventLoopGroup();
            } catch (Throwable t) {
                LOG.warn("Could not use Netty Epoll event loop for benchmark {}", t.getMessage());
                eventLoop = new NioEventLoopGroup();
            }
        } else {
            eventLoop = new NioEventLoopGroup();
        }

        OrderedExecutor executor = OrderedExecutor.newBuilder()
                .name("BenchBookieClientScheduler")
                .numThreads(1)
                .build();
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                new DefaultThreadFactory("BookKeeperClientScheduler"));

        ClientConfiguration conf = new ClientConfiguration();
        BookieClient bc = new BookieClientImpl(conf, eventLoop, PooledByteBufAllocator.DEFAULT, executor, scheduler,
                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        LatencyCallback lc = new LatencyCallback();

        ThroughputCallback tc = new ThroughputCallback();

        long ledger = getValidLedgerId(servers);
        for (long entry = 0; entry < warmUpCount; entry++) {
            ByteBuf toSend = Unpooled.buffer(size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new BookieSocketAddress(addr, port).toBookieId(), ledger, new byte[20],
                    entry, ByteBufList.get(toSend), tc, null, BookieProtocol.FLAG_NONE,
                    false, WriteFlag.NONE);
        }
        LOG.info("Waiting for warmup");
        tc.waitFor(warmUpCount);

        ledger = getValidLedgerId(servers);
        LOG.info("Benchmarking latency");
        long startTime = System.nanoTime();
        for (long entry = 0; entry < latencyCount; entry++) {
            ByteBuf toSend = Unpooled.buffer(size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            lc.resetComplete();
            bc.addEntry(new BookieSocketAddress(addr, port).toBookieId(), ledger, new byte[20],
                        entry, ByteBufList.get(toSend), lc, null,
                        BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
            lc.waitForComplete();
        }
        long endTime = System.nanoTime();
        LOG.info("Latency: " + (((double) (endTime - startTime)) / ((double) latencyCount)) / 1000000.0);

        ledger = getValidLedgerId(servers);
        LOG.info("Benchmarking throughput");
        startTime = System.currentTimeMillis();
        tc = new ThroughputCallback();
        for (long entry = 0; entry < throughputCount; entry++) {
            ByteBuf toSend = Unpooled.buffer(size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new BookieSocketAddress(addr, port).toBookieId(), ledger, new byte[20],
                    entry, ByteBufList.get(toSend), tc, null, BookieProtocol.FLAG_NONE,
                    false, WriteFlag.NONE);
        }
        tc.waitFor(throughputCount);
        endTime = System.currentTimeMillis();
        LOG.info("Throughput: " + ((long) throughputCount) * 1000 / (endTime - startTime));

        bc.close();
        scheduler.shutdown();
        eventLoop.shutdownGracefully();
        executor.shutdown();
    }