in src/producer/TransactionMQProducerImpl.cpp [42:99]
TransactionSendResult TransactionMQProducerImpl::sendMessageInTransaction(MQMessage& msg, void* arg) {
if (!m_transactionListener) {
THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
}
SendResult sendResult;
msg.setProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED, "true");
msg.setProperty(MQMessage::PROPERTY_PRODUCER_GROUP, getGroupName());
try {
sendResult = send(msg);
} catch (MQException& e) {
THROW_MQEXCEPTION(MQClientException, e.what(), -1);
}
LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN;
switch (sendResult.getSendStatus()) {
case SendStatus::SEND_OK:
try {
if (sendResult.getTransactionId() != "") {
msg.setProperty("__transactionId__", sendResult.getTransactionId());
}
string transactionId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (transactionId != "") {
msg.setTransactionId(transactionId);
}
LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", sendResult.getMsgId().data(),
transactionId.data());
localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg);
if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) {
LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data());
}
} catch (MQException& e) {
THROW_MQEXCEPTION(MQClientException, e.what(), -1);
}
break;
case SendStatus::SEND_FLUSH_DISK_TIMEOUT:
case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE;
LOG_WARN("sendMessageInTransaction, send not ok, rollback, result:%s", sendResult.toString().data());
break;
default:
break;
}
try {
endTransaction(sendResult, localTransactionState);
} catch (MQException& e) {
LOG_WARN("endTransaction exception:%s", e.what());
}
TransactionSendResult transactionSendResult(sendResult.getSendStatus(), sendResult.getMsgId(),
sendResult.getOffsetMsgId(), sendResult.getMessageQueue(),
sendResult.getQueueOffset());
transactionSendResult.setTransactionId(msg.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}