protected void handleMessageInternal()

in spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java [159:203]


	protected void handleMessageInternal(Message<?> message) {
		try {
			org.apache.rocketmq.common.message.Message mqMessage = RocketMQMessageConverterSupport
					.convertMessage2MQ(destination.getName(), message);
			SendResult sendResult;
			if (defaultMQProducer instanceof TransactionMQProducer translateMQProducer) {
				TransactionListener transactionListener = RocketMQBeanContainerCache
						.getBean(mqProducerProperties.getTransactionListener(),
								TransactionListener.class);
				if (transactionListener == null) {
					throw new MessagingException(
							"TransactionMQProducer must have a TransactionListener !!! ");
				}
				translateMQProducer.setTransactionListener(transactionListener);
				if (log.isDebugEnabled()) {
					log.debug("send transaction message ->{}", mqMessage);
				}
				sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage,
						message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
			}
			else {
				if (log.isDebugEnabled()) {
					log.debug("send message ->{}", mqMessage);
				}
				sendResult = this.send(mqMessage, this.messageQueueSelector,
						message.getHeaders(), message);
			}
			if (log.isDebugEnabled()) {
				log.debug("the message has sent,message={},sendResult={}", mqMessage,
						sendResult);
			}
			if (sendResult == null
					|| !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
				log.error("message send fail.SendStatus is not OK.the message={}",
						mqMessage);
				this.doFail(message, new MessagingException(
						"message send fail.SendStatus is not OK."));
			}
		}
		catch (Exception e) {
			log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(),
					e);
			this.doFail(message, e);
		}
	}