public SendResult sendTopicMessageRequest()

in src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java [376:432]


    public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
        List<TopicConfigInfo> topicConfigInfos = examineTopicConfig(sendTopicMessageRequest.getTopic());
        String messageType = topicConfigInfos.get(0).getMessageType();
        AclClientRPCHook rpcHook = null;
        if (configure.isACLEnabled()) {
            rpcHook = new AclClientRPCHook(new SessionCredentials(
                    configure.getAccessKey(),
                    configure.getSecretKey()
            ));
        }
        if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
            // transaction message
            TransactionListener transactionListener = new TransactionListenerImpl();

            TransactionMQProducer producer = buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
            producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
            producer.setNamesrvAddr(configure.getNamesrvAddr());
            producer.setTransactionListener(transactionListener);
            try {
                producer.start();
                Message msg = new Message(sendTopicMessageRequest.getTopic(),
                        sendTopicMessageRequest.getTag(),
                        sendTopicMessageRequest.getKey(),
                        sendTopicMessageRequest.getMessageBody().getBytes()
                );
                return producer.sendMessageInTransaction(msg, null);
            } catch (Exception e) {
                Throwables.throwIfUnchecked(e);
                throw new RuntimeException(e);
            } finally {
                waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
                producer.shutdown();
            }
        } else {
            // no transaction message
            DefaultMQProducer producer = null;
            producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
            producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
            producer.setNamesrvAddr(configure.getNamesrvAddr());
            try {
                producer.start();
                Message msg = new Message(sendTopicMessageRequest.getTopic(),
                        sendTopicMessageRequest.getTag(),
                        sendTopicMessageRequest.getKey(),
                        sendTopicMessageRequest.getMessageBody().getBytes()
                );
                return producer.send(msg);
            } catch (Exception e) {
                Throwables.throwIfUnchecked(e);
                throw new RuntimeException(e);
            } finally {
                waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
                producer.shutdown();
            }
        }

    }