in src/producer/TransactionMQProducerImpl.cpp [150:204]
void TransactionMQProducerImpl::checkTransactionStateImpl(const std::string& addr,
const MQMessageExt& message,
long tranStateTableOffset,
long commitLogOffset,
const std::string& msgId,
const std::string& transactionId,
const std::string& offsetMsgId) {
LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
LocalTransactionState localTransactionState = UNKNOWN;
try {
localTransactionState = m_transactionListener->checkLocalTransaction(message);
} catch (MQException& e) {
LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what());
}
EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
endHeader->m_commitLogOffset = commitLogOffset;
endHeader->m_producerGroup = getGroupName();
endHeader->m_tranStateTableOffset = tranStateTableOffset;
endHeader->m_fromTransactionCheck = true;
string uniqueKey = transactionId;
if (transactionId.empty()) {
uniqueKey = message.getMsgId();
}
endHeader->m_msgId = uniqueKey;
endHeader->m_transactionId = transactionId;
switch (localTransactionState) {
case COMMIT_MESSAGE:
endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType;
break;
case ROLLBACK_MESSAGE:
endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType;
LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data());
break;
case UNKNOWN:
endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType;
LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data());
break;
default:
break;
}
LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s",
uniqueKey.data(), localTransactionState, endHeader->toString().data());
string remark;
try {
getFactory()->getMQClientAPIImpl()->endTransactionOneway(addr, endHeader, remark, getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR("endTransactionOneway exception:%s", e.what());
throw e;
}
}