void TransactionMQProducerImpl::checkTransactionStateImpl()

in src/producer/TransactionMQProducerImpl.cpp [150:204]


void TransactionMQProducerImpl::checkTransactionStateImpl(const std::string& addr,
                                                          const MQMessageExt& message,
                                                          long tranStateTableOffset,
                                                          long commitLogOffset,
                                                          const std::string& msgId,
                                                          const std::string& transactionId,
                                                          const std::string& offsetMsgId) {
  LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
  LocalTransactionState localTransactionState = UNKNOWN;
  try {
    localTransactionState = m_transactionListener->checkLocalTransaction(message);
  } catch (MQException& e) {
    LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what());
  }

  EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
  endHeader->m_commitLogOffset = commitLogOffset;
  endHeader->m_producerGroup = getGroupName();
  endHeader->m_tranStateTableOffset = tranStateTableOffset;
  endHeader->m_fromTransactionCheck = true;

  string uniqueKey = transactionId;
  if (transactionId.empty()) {
    uniqueKey = message.getMsgId();
  }

  endHeader->m_msgId = uniqueKey;
  endHeader->m_transactionId = transactionId;
  switch (localTransactionState) {
    case COMMIT_MESSAGE:
      endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType;
      break;
    case ROLLBACK_MESSAGE:
      endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType;
      LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data());
      break;
    case UNKNOWN:
      endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType;
      LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data());
      break;
    default:
      break;
  }

  LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s",
           uniqueKey.data(), localTransactionState, endHeader->toString().data());

  string remark;
  try {
    getFactory()->getMQClientAPIImpl()->endTransactionOneway(addr, endHeader, remark, getSessionCredentials());
  } catch (MQException& e) {
    LOG_ERROR("endTransactionOneway exception:%s", e.what());
    throw e;
  }
}