public static void main()

in jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java [65:221]


    public static void main(String[] args) throws Exception {
        OptionParser parser = new OptionParser();

        OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
            .withRequiredArg()
            .describedAs("path")
            .ofType(String.class)
            .defaultsTo(System.getProperty("java.io.tmpdir"));

        OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
            .withRequiredArg()
            .describedAs("num_bytes")
            .ofType(Long.class);

        OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
            .withRequiredArg()
            .describedAs("num_bytes")
            .ofType(Integer.class);

        OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.")
            .withRequiredArg()
            .describedAs("num_bytes")
            .ofType(Integer.class)
            .defaultsTo(1024);

        OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.")
            .withRequiredArg()
            .describedAs("num_files")
            .ofType(Integer.class)
            .defaultsTo(1);

        OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
            .withRequiredArg()
            .describedAs("ms")
            .ofType(Long.class)
            .defaultsTo(1000L);

        OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
            .withRequiredArg()
            .describedAs("mb")
            .ofType(Integer.class)
            .defaultsTo(Integer.MAX_VALUE);

        OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
            .withRequiredArg()
            .describedAs("message_count")
            .ofType(Long.class)
            .defaultsTo(Long.MAX_VALUE);

        OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
            .withRequiredArg()
            .describedAs("codec")
            .ofType(String.class)
            .defaultsTo(CompressionType.NONE.name);

        OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
            .withRequiredArg()
            .describedAs("level")
            .ofType(Integer.class)
            .defaultsTo(0);

        OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
        OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
        OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
        OptionSet options = parser.parse(args);
        CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt);

        long bytesToWrite = options.valueOf(bytesOpt);
        int bufferSize = options.valueOf(sizeOpt);
        int numFiles = options.valueOf(filesOpt);
        long reportingInterval = options.valueOf(reportingIntervalOpt);
        String dir = options.valueOf(dirOpt);
        long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
        int messageSize = options.valueOf(messageSizeOpt);
        long flushInterval = options.valueOf(flushIntervalOpt);
        CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
        Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
        int compressionLevel = options.valueOf(compressionLevelOpt);

        setupCompression(compressionType, compressionBuilder, compressionLevel);

        ThreadLocalRandom.current().nextBytes(buffer.array());
        int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD);
        long createTime = System.currentTimeMillis();

        List<SimpleRecord> recordsList = new ArrayList<>();
        for (int i = 0; i < numMessages; i++) {
            recordsList.add(new SimpleRecord(createTime, null, new byte[messageSize]));
        }

        MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE, recordsList.toArray(new SimpleRecord[0]));
        Writable[] writables = new Writable[numFiles];
        KafkaScheduler scheduler = new KafkaScheduler(1);
        scheduler.startup();

        for (int i = 0; i < numFiles; i++) {
            if (options.has(mmapOpt)) {
                writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
            } else if (options.has(channelOpt)) {
                writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
            } else if (options.has(logOpt)) {
                int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
                Properties logProperties = new Properties();
                logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
                logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
                LogConfig logConfig = new LogConfig(logProperties);
                writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
            } else {
                System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
                Exit.exit(1);
            }
        }
        bytesToWrite = (bytesToWrite / numFiles) * numFiles;

        System.out.printf("%10s\t%10s\t%10s%n", "mb_sec", "avg_latency", "max_latency");

        long beginTest = System.nanoTime();
        long maxLatency = 0L;
        long totalLatency = 0L;
        long count = 0L;
        long written = 0L;
        long totalWritten = 0L;
        long lastReport = beginTest;

        while (totalWritten + bufferSize < bytesToWrite) {
            long start = System.nanoTime();
            int writeSize = writables[(int) (count % numFiles)].write();
            long elapsed = System.nanoTime() - start;
            maxLatency = Math.max(elapsed, maxLatency);
            totalLatency += elapsed;
            written += writeSize;
            count += 1;
            totalWritten += writeSize;
            if ((start - lastReport) / (1000.0 * 1000.0) > reportingInterval) {
                double elapsedSecs = (start - lastReport) / (1000.0 * 1000.0 * 1000.0);
                double mb = written / (1024.0 * 1024.0);
                System.out.printf("%10.3f\t%10.3f\t%10.3f%n", mb / elapsedSecs, (totalLatency / (double) count) / (1000.0 * 1000.0), maxLatency / (1000.0 * 1000.0));
                lastReport = start;
                written = 0;
                maxLatency = 0L;
                totalLatency = 0L;
            } else if (written > maxThroughputBytes * (reportingInterval / 1000.0)) {
                long lastReportMs = lastReport / (1000 * 1000);
                long now = System.nanoTime() / (1000 * 1000);
                long sleepMs = lastReportMs + reportingInterval - now;
                if (sleepMs > 0)
                    Thread.sleep(sleepMs);
            }
        }
        double elapsedSecs = (System.nanoTime() - beginTest) / (1000.0 * 1000.0 * 1000.0);
        System.out.println((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs)) + " MB per sec");
        scheduler.shutdown();
        for (Writable writable : writables) {
            writable.close();
        }
    }