in example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java [57:190]
public static void main(String[] args) throws MQClientException {
System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
System.exit(-1);
}
final String namesrv = getOptionValue(commandLine, 'n', "127.0.0.1:9876");
final String topic = getOptionValue(commandLine, 't', "BenchmarkTest");
final int threadCount = getOptionValue(commandLine, 'w', 64);
final int messageSize = getOptionValue(commandLine, 's', 128);
final int batchSize = getOptionValue(commandLine, 'b', 16);
final boolean keyEnable = getOptionValue(commandLine, 'k', false);
final int propertySize = getOptionValue(commandLine, 'p', 0);
final int tagCount = getOptionValue(commandLine, 'l', 0);
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;
System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
"aclEnable: %s%n compressEnable: %s, reportInterval: %d%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
sb.append(RandomStringUtils.randomAlphanumeric(1));
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval);
statsBenchmark.start();
RPCHook rpcHook = null;
if (aclEnable) {
String ak = commandLine.hasOption("ak") ? String.valueOf(commandLine.getOptionValue("ak")) : AclClient.ACL_ACCESS_KEY;
String sk = commandLine.hasOption("sk") ? String.valueOf(commandLine.getOptionValue("sk")) : AclClient.ACL_SECRET_KEY;
rpcHook = AclClient.getAclRPCHook(ak, sk);
}
final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, rpcHook);
if (enableCompress) {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.setCompressType(CompressionType.of(compressType));
producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
}
producer.start();
final Logger logger = LoggerFactory.getLogger(BatchProducer.class);
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
public void run() {
while (true) {
List<Message> msgs = buildBathMessage(batchSize, topic);
if (CollectionUtils.isEmpty(msgs)) {
return;
}
try {
long beginTimestamp = System.currentTimeMillis();
long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue();
setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
setTags(tagCount, msgs, sendSucCount);
setProperties(propertySize, msgs);
SendResult sendResult = producer.send(msgs);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
statsBenchmark.getSendRequestSuccessCount().increment();
statsBenchmark.getSendMessageSuccessCount().add(msgs.size());
} else {
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
}
long currentRT = System.currentTimeMillis() - beginTimestamp;
statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated) {
break;
}
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
} catch (RemotingException e) {
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQClientException e) {
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
}
}
}
});
}
}