in src/producer/DefaultMQProducerImpl.cpp [437:511]
SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
const MQMessageQueue& mq,
int communicationMode,
SendCallback* sendCallback) {
string brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
if (brokerAddr.empty()) {
getFactory()->tryToFindTopicPublishInfo(mq.getTopic(), getSessionCredentials());
brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
}
if (!brokerAddr.empty()) {
boost::scoped_ptr<SendMessageContext> pSendMesgContext(new SendMessageContext());
try {
bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage));
// msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
if (!isBatchMsg) {
string unique_id = StringIdMaker::getInstance().createUniqID();
msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
// batch does not support compressing right now,
tryToCompressMessage(msg);
}
LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook()) {
pSendMesgContext.reset(new SendMessageContext);
pSendMesgContext->setDefaultMqProducer(this);
pSendMesgContext->setProducerGroup(NameSpaceUtil::withoutNameSpace(getGroupName(), getNameSpace()));
pSendMesgContext->setCommunicationMode(static_cast<CommunicationMode>(communicationMode));
pSendMesgContext->setBornHost(UtilAll::getLocalAddress());
pSendMesgContext->setBrokerAddr(brokerAddr);
pSendMesgContext->setMessage(msg);
pSendMesgContext->setMessageQueue(mq);
pSendMesgContext->setMsgType(TRACE_NORMAL_MSG);
pSendMesgContext->setNameSpace(getNameSpace());
string tranMsg = msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
if (!tranMsg.empty() && tranMsg == "true") {
pSendMesgContext->setMsgType(TRACE_TRANS_HALF_MSG);
}
if (msg.getProperty("__STARTDELIVERTIME") != "" ||
msg.getProperty(MQMessage::PROPERTY_DELAY_TIME_LEVEL) != "") {
pSendMesgContext->setMsgType(TRACE_DELAY_MSG);
}
executeSendMessageHookBefore(pSendMesgContext.get());
}
SendMessageRequestHeader requestHeader;
requestHeader.producerGroup = getGroupName();
requestHeader.topic = (msg.getTopic());
requestHeader.defaultTopic = DEFAULT_TOPIC;
requestHeader.defaultTopicQueueNums = 4;
requestHeader.queueId = (mq.getQueueId());
requestHeader.sysFlag = (msg.getSysFlag());
requestHeader.bornTimestamp = UtilAll::currentTimeMillis();
requestHeader.flag = (msg.getFlag());
requestHeader.consumeRetryTimes = 16;
requestHeader.batch = isBatchMsg;
requestHeader.properties = (MQDecoder::messageProperties2String(msg.getProperties()));
SendResult sendResult = getFactory()->getMQClientAPIImpl()->sendMessage(
brokerAddr, mq.getBrokerName(), msg, requestHeader, getSendMsgTimeout(), getRetryTimes4Async(),
communicationMode, sendCallback, getSessionCredentials());
if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook() && sendCallback == NULL &&
communicationMode == ComMode_SYNC) {
pSendMesgContext->setSendResult(sendResult);
executeSendMessageHookAfter(pSendMesgContext.get());
}
return sendResult;
} catch (MQException& e) {
throw e;
}
}
THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
}