in src/producer/DefaultMQProducerImpl.cpp [373:435]
SendResult DefaultMQProducerImpl::sendDefaultImpl(MQMessage& msg,
int communicationMode,
SendCallback* pSendCallback,
bool bActiveMQ) {
MQMessageQueue lastmq;
int mq_index = 0;
for (int times = 1; times <= m_retryTimes; times++) {
boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), getSessionCredentials()));
boost::shared_ptr<TopicPublishInfo> topicPublishInfo(weak_topicPublishInfo.lock());
if (topicPublishInfo) {
if (times == 1) {
mq_index = topicPublishInfo->getWhichQueue();
} else {
mq_index++;
}
SendResult sendResult;
MQMessageQueue mq;
if (bActiveMQ)
mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
else
mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
lastmq = mq;
if (mq.getQueueId() == -1) {
// THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
// invalide", -1);
continue;
}
try {
LOG_DEBUG("send to mq:%s", mq.toString().data());
sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
switch (communicationMode) {
case ComMode_ASYNC:
return sendResult;
case ComMode_ONEWAY:
return sendResult;
case ComMode_SYNC:
if (sendResult.getSendStatus() != SEND_OK) {
if (bActiveMQ) {
topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
}
continue;
}
return sendResult;
default:
break;
}
} catch (...) {
LOG_ERROR("send failed of times:%d,brokerName:%s", times, mq.getBrokerName().c_str());
if (bActiveMQ) {
topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
}
continue;
}
} // end of for
LOG_WARN("Retry many times, still failed");
}
string info = "No route info of this topic: " + msg.getTopic();
THROW_MQEXCEPTION(MQClientException, info, -1);
}