in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java [159:203]
protected void handleMessageInternal(Message<?> message) {
try {
org.apache.rocketmq.common.message.Message mqMessage = RocketMQMessageConverterSupport
.convertMessage2MQ(destination.getName(), message);
SendResult sendResult;
if (defaultMQProducer instanceof TransactionMQProducer translateMQProducer) {
TransactionListener transactionListener = RocketMQBeanContainerCache
.getBean(mqProducerProperties.getTransactionListener(),
TransactionListener.class);
if (transactionListener == null) {
throw new MessagingException(
"TransactionMQProducer must have a TransactionListener !!! ");
}
translateMQProducer.setTransactionListener(transactionListener);
if (log.isDebugEnabled()) {
log.debug("send transaction message ->{}", mqMessage);
}
sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage,
message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
}
else {
if (log.isDebugEnabled()) {
log.debug("send message ->{}", mqMessage);
}
sendResult = this.send(mqMessage, this.messageQueueSelector,
message.getHeaders(), message);
}
if (log.isDebugEnabled()) {
log.debug("the message has sent,message={},sendResult={}", mqMessage,
sendResult);
}
if (sendResult == null
|| !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.error("message send fail.SendStatus is not OK.the message={}",
mqMessage);
this.doFail(message, new MessagingException(
"message send fail.SendStatus is not OK."));
}
}
catch (Exception e) {
log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(),
e);
this.doFail(message, e);
}
}