public static void main()

in example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java [57:190]


    public static void main(String[] args) throws MQClientException {
        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new DefaultParser());
        if (null == commandLine) {
            System.exit(-1);
        }

        final String namesrv = getOptionValue(commandLine, 'n', "127.0.0.1:9876");
        final String topic = getOptionValue(commandLine, 't', "BenchmarkTest");
        final int threadCount = getOptionValue(commandLine, 'w', 64);
        final int messageSize = getOptionValue(commandLine, 's', 128);
        final int batchSize = getOptionValue(commandLine, 'b', 16);
        final boolean keyEnable = getOptionValue(commandLine, 'k', false);
        final int propertySize = getOptionValue(commandLine, 'p', 0);
        final int tagCount = getOptionValue(commandLine, 'l', 0);
        final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
        final boolean aclEnable = getOptionValue(commandLine, 'a', false);
        final boolean enableCompress = commandLine.hasOption('c') && Boolean.parseBoolean(commandLine.getOptionValue('c'));
        final int reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;

        System.out.printf("topic: %s, threadCount: %d, messageSize: %d, batchSize: %d, keyEnable: %s, propertySize: %d, tagCount: %d, traceEnable: %s, " +
                "aclEnable: %s%n compressEnable: %s, reportInterval: %d%n",
            topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, enableCompress, reportInterval);

        StringBuilder sb = new StringBuilder(messageSize);
        for (int i = 0; i < messageSize; i++) {
            sb.append(RandomStringUtils.randomAlphanumeric(1));
        }
        msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);

        final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(reportInterval);
        statsBenchmark.start();

        RPCHook rpcHook = null;
        if (aclEnable) {
            String ak = commandLine.hasOption("ak") ? String.valueOf(commandLine.getOptionValue("ak")) : AclClient.ACL_ACCESS_KEY;
            String sk = commandLine.hasOption("sk") ? String.valueOf(commandLine.getOptionValue("sk")) : AclClient.ACL_SECRET_KEY;
            rpcHook = AclClient.getAclRPCHook(ak, sk);
        }

        final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, rpcHook);

        if (enableCompress) {
            String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
            int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
            int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
            producer.setCompressType(CompressionType.of(compressType));
            producer.setCompressLevel(compressLevel);
            producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
            System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
        } else {
            producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
        }

        producer.start();

        final Logger logger = LoggerFactory.getLogger(BatchProducer.class);
        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            sendThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        List<Message> msgs = buildBathMessage(batchSize, topic);

                        if (CollectionUtils.isEmpty(msgs)) {
                            return;
                        }

                        try {
                            long beginTimestamp = System.currentTimeMillis();
                            long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue();

                            setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
                            setTags(tagCount, msgs, sendSucCount);
                            setProperties(propertySize, msgs);
                            SendResult sendResult = producer.send(msgs);
                            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                                statsBenchmark.getSendRequestSuccessCount().increment();
                                statsBenchmark.getSendMessageSuccessCount().add(msgs.size());
                            } else {
                                statsBenchmark.getSendRequestFailedCount().increment();
                                statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            }
                            long currentRT = System.currentTimeMillis() - beginTimestamp;
                            statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
                            while (currentRT > prevMaxRT) {
                                boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
                                if (updated) {
                                    break;
                                }

                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                            }
                        } catch (RemotingException e) {
                            statsBenchmark.getSendRequestFailedCount().increment();
                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            logger.error("[BENCHMARK_PRODUCER] Send Exception", e);

                            try {
                                Thread.sleep(3000);
                            } catch (InterruptedException ignored) {
                            }
                        } catch (InterruptedException e) {
                            statsBenchmark.getSendRequestFailedCount().increment();
                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            try {
                                Thread.sleep(3000);
                            } catch (InterruptedException e1) {
                            }
                            statsBenchmark.getSendRequestFailedCount().increment();
                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
                        } catch (MQClientException e) {
                            statsBenchmark.getSendRequestFailedCount().increment();
                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
                        } catch (MQBrokerException e) {
                            statsBenchmark.getSendRequestFailedCount().increment();
                            statsBenchmark.getSendMessageFailedCount().add(msgs.size());
                            logger.error("[BENCHMARK_PRODUCER] Send Exception", e);
                            try {
                                Thread.sleep(3000);
                            } catch (InterruptedException ignored) {
                            }
                        }
                    }
                }
            });
        }
    }