TransactionSendResult TransactionMQProducerImpl::sendMessageInTransaction()

in src/producer/TransactionMQProducerImpl.cpp [42:99]


TransactionSendResult TransactionMQProducerImpl::sendMessageInTransaction(MQMessage& msg, void* arg) {
  if (!m_transactionListener) {
    THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
  }

  SendResult sendResult;
  msg.setProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED, "true");
  msg.setProperty(MQMessage::PROPERTY_PRODUCER_GROUP, getGroupName());
  try {
    sendResult = send(msg);
  } catch (MQException& e) {
    THROW_MQEXCEPTION(MQClientException, e.what(), -1);
  }

  LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN;
  switch (sendResult.getSendStatus()) {
    case SendStatus::SEND_OK:
      try {
        if (sendResult.getTransactionId() != "") {
          msg.setProperty("__transactionId__", sendResult.getTransactionId());
        }
        string transactionId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (transactionId != "") {
          msg.setTransactionId(transactionId);
        }
        LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", sendResult.getMsgId().data(),
                  transactionId.data());
        localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg);
        if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) {
          LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data());
        }
      } catch (MQException& e) {
        THROW_MQEXCEPTION(MQClientException, e.what(), -1);
      }
      break;
    case SendStatus::SEND_FLUSH_DISK_TIMEOUT:
    case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
    case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
      localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE;
      LOG_WARN("sendMessageInTransaction, send not ok, rollback, result:%s", sendResult.toString().data());
      break;
    default:
      break;
  }

  try {
    endTransaction(sendResult, localTransactionState);
  } catch (MQException& e) {
    LOG_WARN("endTransaction exception:%s", e.what());
  }

  TransactionSendResult transactionSendResult(sendResult.getSendStatus(), sendResult.getMsgId(),
                                              sendResult.getOffsetMsgId(), sendResult.getMessageQueue(),
                                              sendResult.getQueueOffset());
  transactionSendResult.setTransactionId(msg.getTransactionId());
  transactionSendResult.setLocalTransactionState(localTransactionState);
  return transactionSendResult;
}