in jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java [65:221]
public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class)
.defaultsTo(0);
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
OptionSet options = parser.parse(args);
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt);
long bytesToWrite = options.valueOf(bytesOpt);
int bufferSize = options.valueOf(sizeOpt);
int numFiles = options.valueOf(filesOpt);
long reportingInterval = options.valueOf(reportingIntervalOpt);
String dir = options.valueOf(dirOpt);
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int messageSize = options.valueOf(messageSizeOpt);
long flushInterval = options.valueOf(flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
int compressionLevel = options.valueOf(compressionLevelOpt);
setupCompression(compressionType, compressionBuilder, compressionLevel);
ThreadLocalRandom.current().nextBytes(buffer.array());
int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD);
long createTime = System.currentTimeMillis();
List<SimpleRecord> recordsList = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
recordsList.add(new SimpleRecord(createTime, null, new byte[messageSize]));
}
MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE, recordsList.toArray(new SimpleRecord[0]));
Writable[] writables = new Writable[numFiles];
KafkaScheduler scheduler = new KafkaScheduler(1);
scheduler.startup();
for (int i = 0; i < numFiles; i++) {
if (options.has(mmapOpt)) {
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
} else if (options.has(channelOpt)) {
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
} else if (options.has(logOpt)) {
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
Properties logProperties = new Properties();
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
LogConfig logConfig = new LogConfig(logProperties);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
Exit.exit(1);
}
}
bytesToWrite = (bytesToWrite / numFiles) * numFiles;
System.out.printf("%10s\t%10s\t%10s%n", "mb_sec", "avg_latency", "max_latency");
long beginTest = System.nanoTime();
long maxLatency = 0L;
long totalLatency = 0L;
long count = 0L;
long written = 0L;
long totalWritten = 0L;
long lastReport = beginTest;
while (totalWritten + bufferSize < bytesToWrite) {
long start = System.nanoTime();
int writeSize = writables[(int) (count % numFiles)].write();
long elapsed = System.nanoTime() - start;
maxLatency = Math.max(elapsed, maxLatency);
totalLatency += elapsed;
written += writeSize;
count += 1;
totalWritten += writeSize;
if ((start - lastReport) / (1000.0 * 1000.0) > reportingInterval) {
double elapsedSecs = (start - lastReport) / (1000.0 * 1000.0 * 1000.0);
double mb = written / (1024.0 * 1024.0);
System.out.printf("%10.3f\t%10.3f\t%10.3f%n", mb / elapsedSecs, (totalLatency / (double) count) / (1000.0 * 1000.0), maxLatency / (1000.0 * 1000.0));
lastReport = start;
written = 0;
maxLatency = 0L;
totalLatency = 0L;
} else if (written > maxThroughputBytes * (reportingInterval / 1000.0)) {
long lastReportMs = lastReport / (1000 * 1000);
long now = System.nanoTime() / (1000 * 1000);
long sleepMs = lastReportMs + reportingInterval - now;
if (sleepMs > 0)
Thread.sleep(sleepMs);
}
}
double elapsedSecs = (System.nanoTime() - beginTest) / (1000.0 * 1000.0 * 1000.0);
System.out.println((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs)) + " MB per sec");
scheduler.shutdown();
for (Writable writable : writables) {
writable.close();
}
}