SendResult DefaultMQProducerImpl::sendKernelImpl()

in src/producer/DefaultMQProducerImpl.cpp [437:511]


SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
                                                 const MQMessageQueue& mq,
                                                 int communicationMode,
                                                 SendCallback* sendCallback) {
  string brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());

  if (brokerAddr.empty()) {
    getFactory()->tryToFindTopicPublishInfo(mq.getTopic(), getSessionCredentials());
    brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
  }

  if (!brokerAddr.empty()) {
    boost::scoped_ptr<SendMessageContext> pSendMesgContext(new SendMessageContext());
    try {
      bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage));
      // msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
      if (!isBatchMsg) {
        string unique_id = StringIdMaker::getInstance().createUniqID();
        msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);

        // batch does not support compressing right now,
        tryToCompressMessage(msg);
      }

      LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
      if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook()) {
        pSendMesgContext.reset(new SendMessageContext);
        pSendMesgContext->setDefaultMqProducer(this);
        pSendMesgContext->setProducerGroup(NameSpaceUtil::withoutNameSpace(getGroupName(), getNameSpace()));
        pSendMesgContext->setCommunicationMode(static_cast<CommunicationMode>(communicationMode));
        pSendMesgContext->setBornHost(UtilAll::getLocalAddress());
        pSendMesgContext->setBrokerAddr(brokerAddr);
        pSendMesgContext->setMessage(msg);
        pSendMesgContext->setMessageQueue(mq);
        pSendMesgContext->setMsgType(TRACE_NORMAL_MSG);
        pSendMesgContext->setNameSpace(getNameSpace());
        string tranMsg = msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
        if (!tranMsg.empty() && tranMsg == "true") {
          pSendMesgContext->setMsgType(TRACE_TRANS_HALF_MSG);
        }

        if (msg.getProperty("__STARTDELIVERTIME") != "" ||
            msg.getProperty(MQMessage::PROPERTY_DELAY_TIME_LEVEL) != "") {
          pSendMesgContext->setMsgType(TRACE_DELAY_MSG);
        }
        executeSendMessageHookBefore(pSendMesgContext.get());
      }
      SendMessageRequestHeader requestHeader;
      requestHeader.producerGroup = getGroupName();
      requestHeader.topic = (msg.getTopic());
      requestHeader.defaultTopic = DEFAULT_TOPIC;
      requestHeader.defaultTopicQueueNums = 4;
      requestHeader.queueId = (mq.getQueueId());
      requestHeader.sysFlag = (msg.getSysFlag());
      requestHeader.bornTimestamp = UtilAll::currentTimeMillis();
      requestHeader.flag = (msg.getFlag());
      requestHeader.consumeRetryTimes = 16;
      requestHeader.batch = isBatchMsg;
      requestHeader.properties = (MQDecoder::messageProperties2String(msg.getProperties()));

      SendResult sendResult = getFactory()->getMQClientAPIImpl()->sendMessage(
          brokerAddr, mq.getBrokerName(), msg, requestHeader, getSendMsgTimeout(), getRetryTimes4Async(),
          communicationMode, sendCallback, getSessionCredentials());
      if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook() && sendCallback == NULL &&
          communicationMode == ComMode_SYNC) {
        pSendMesgContext->setSendResult(sendResult);
        executeSendMessageHookAfter(pSendMesgContext.get());
      }
      return sendResult;
    } catch (MQException& e) {
      throw e;
    }
  }
  THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
}