in tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java [48:187]
public static void main(String[] args) throws Exception {
ArgumentParser parser = argParser();
try {
Namespace res = parser.parseArgs(args);
/* parse args */
String topicName = res.getString("topic");
long numRecords = res.getLong("numRecords");
Integer recordSize = res.getInt("recordSize");
int throughput = res.getInt("throughput");
List<String> producerProps = res.getList("producerConfig");
String producerConfig = res.getString("producerConfigFile");
String payloadFilePath = res.getString("payloadFile");
String transactionalId = res.getString("transactionalId");
boolean shouldPrintMetrics = res.getBoolean("printMetrics");
long transactionDurationMs = res.getLong("transactionDurationMs");
boolean transactionsEnabled = 0 < transactionDurationMs;
// since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
if (producerProps == null && producerConfig == null) {
throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);
}
List<byte[]> payloadByteList = new ArrayList<>();
if (payloadFilePath != null) {
Path path = Paths.get(payloadFilePath);
System.out.println("Reading payloads from: " + path.toAbsolutePath());
if (Files.notExists(path) || Files.size(path) == 0) {
throw new IllegalArgumentException("File does not exist or empty file provided.");
}
String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);
System.out.println("Number of messages read: " + payloadList.length);
for (String payload : payloadList) {
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
}
}
Properties props = new Properties();
if (producerConfig != null) {
props.putAll(Utils.loadProps(producerConfig));
}
if (producerProps != null)
for (String prop : producerProps) {
String[] pieces = prop.split("=");
if (pieces.length != 2)
throw new IllegalArgumentException("Invalid property: " + prop);
props.put(pieces[0], pieces[1]);
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
if (transactionsEnabled)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
if (transactionsEnabled)
producer.initTransactions();
/* setup perf test */
byte[] payload = null;
Random random = new Random(0);
if (recordSize != null) {
payload = new byte[recordSize];
for (int i = 0; i < payload.length; ++i)
payload[i] = (byte) (random.nextInt(26) + 65);
}
ProducerRecord<byte[], byte[]> record;
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
int currentTransactionSize = 0;
long transactionStartTime = 0;
for (int i = 0; i < numRecords; i++) {
if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}
if (payloadFilePath != null) {
payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
}
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
currentTransactionSize++;
if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
producer.commitTransaction();
currentTransactionSize = 0;
}
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
if (transactionsEnabled && currentTransactionSize != 0)
producer.commitTransaction();
if (!shouldPrintMetrics) {
producer.close();
/* print final results */
stats.printTotal();
} else {
// Make sure all messages are sent before printing out the stats and the metrics
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
// expects this class to work with older versions of the client jar that don't support flush().
producer.flush();
/* print final results */
stats.printTotal();
/* print out metrics */
ToolsUtils.printMetrics(producer.metrics());
producer.close();
}
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
}