in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java [126:158]
public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) {
this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null;
try {
sendResultRMQ = transactionMQProducer.sendMessageInTransaction(msgRMQ,
new org.apache.rocketmq.client.producer.LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(
org.apache.rocketmq.common.message.Message msg,
Object arg) {
String msgId = msg.getProperty(Constants.TRANSACTION_ID);
message.setMsgID(msgId);
TransactionStatus transactionStatus = executer.execute(message, arg);
if (TransactionStatus.CommitTransaction == transactionStatus) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (TransactionStatus.RollbackTransaction == transactionStatus) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}, arg);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (sendResultRMQ.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
throw new RuntimeException("local transaction branch failed ,so transaction rollback");
}
SendResult sendResult = new SendResult();
sendResult.setMessageId(sendResultRMQ.getMsgId());
sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
return sendResult;
}