public static void main()

in example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java [66:198]


    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new DefaultParser());
        TxSendConfig config = new TxSendConfig();
        config.topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
        config.threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32;
        config.messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048;
        config.sendRollbackRate = commandLine.hasOption("sr") ? Double.parseDouble(commandLine.getOptionValue("sr")) : 0.0;
        config.sendUnknownRate = commandLine.hasOption("su") ? Double.parseDouble(commandLine.getOptionValue("su")) : 0.0;
        config.checkRollbackRate = commandLine.hasOption("cr") ? Double.parseDouble(commandLine.getOptionValue("cr")) : 0.0;
        config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0;
        config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
        config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
        config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
        config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
        config.reportInterval = commandLine.hasOption("ri") ? Integer.parseInt(commandLine.getOptionValue("ri")) : 10000;

        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);

        final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();

        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
            new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());

        final LinkedList<Snapshot> snapshotList = new LinkedList<>();

        executorService.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                snapshotList.addLast(statsBenchmark.createSnapshot());
                while (snapshotList.size() > 10) {
                    snapshotList.removeFirst();
                }
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);

        executorService.scheduleAtFixedRate(new TimerTask() {
            private void printStats() {
                if (snapshotList.size() >= 10) {
                    Snapshot begin = snapshotList.getFirst();
                    Snapshot end = snapshotList.getLast();

                    final long sendCount = end.sendRequestSuccessCount - begin.sendRequestSuccessCount;
                    final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime);
                    final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount);

                    final long failCount = end.sendRequestFailedCount - begin.sendRequestFailedCount;
                    final long checkCount = end.checkCount - begin.checkCount;
                    final long unexpectedCheck = end.unexpectedCheckCount - begin.unexpectedCheckCount;
                    final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;

                    System.out.printf(
                        "Current Time: %s | Send TPS: %5d | Max RT(ms): %5d | AVG RT(ms): %3.1f | Send Failed: %d | Check: %d | UnexpectedCheck: %d | DuplicatedCheck: %d%n",
                        UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
                        unexpectedCheck, dupCheck);
                    statsBenchmark.getSendMessageMaxRT().set(0);
                }
            }

            @Override
            public void run() {
                try {
                    this.printStats();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, config.reportInterval, config.reportInterval, TimeUnit.MILLISECONDS);

        RPCHook rpcHook = null;
        if (config.aclEnable) {
            String ak = commandLine.hasOption("ak") ? String.valueOf(commandLine.getOptionValue("ak")) : AclClient.ACL_ACCESS_KEY;
            String sk = commandLine.hasOption("sk") ? String.valueOf(commandLine.getOptionValue("sk")) : AclClient.ACL_SECRET_KEY;
            rpcHook = AclClient.getAclRPCHook(ak, sk);
        }
        final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
        final TransactionMQProducer producer = new TransactionMQProducer(
            "benchmark_transaction_producer",
            rpcHook,
            config.msgTraceEnable,
            null);
        producer.setInstanceName(Long.toString(System.currentTimeMillis()));
        producer.setTransactionListener(transactionCheckListener);
        producer.setDefaultTopicQueueNums(1000);
        if (commandLine.hasOption('n')) {
            String ns = commandLine.getOptionValue('n');
            producer.setNamesrvAddr(ns);
        }
        producer.start();

        for (int i = 0; i < config.threadCount; i++) {
            sendThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        boolean success = false;
                        final long beginTimestamp = System.currentTimeMillis();
                        try {
                            SendResult sendResult =
                                producer.sendMessageInTransaction(buildMessage(config), null);
                            success = sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK;
                        } catch (Throwable e) {
                            success = false;
                        } finally {
                            final long currentRT = System.currentTimeMillis() - beginTimestamp;
                            statsBenchmark.getSendMessageTimeTotal().add(currentRT);
                            long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                            while (currentRT > prevMaxRT) {
                                boolean updated = statsBenchmark.getSendMessageMaxRT()
                                    .compareAndSet(prevMaxRT, currentRT);
                                if (updated)
                                    break;

                                prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                            }
                            if (success) {
                                statsBenchmark.getSendRequestSuccessCount().increment();
                            } else {
                                statsBenchmark.getSendRequestFailedCount().increment();
                            }
                            if (config.sendInterval > 0) {
                                try {
                                    Thread.sleep(config.sendInterval);
                                } catch (InterruptedException e) {
                                }
                            }
                        }
                    }
                }
            });
        }
    }