in src/producer/DefaultMQProducerImpl.cpp [531:602]
SendResult DefaultMQProducerImpl::sendAutoRetrySelectImpl(MQMessage& msg,
MessageQueueSelector* pSelector,
void* pArg,
int communicationMode,
SendCallback* pSendCallback,
int autoRetryTimes,
bool bActiveMQ) {
Validators::checkMessage(msg, getMaxMessageSize());
MQMessageQueue lastmq;
MQMessageQueue mq;
int mq_index = 0;
for (int times = 1; times <= autoRetryTimes + 1; times++) {
boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), getSessionCredentials()));
boost::shared_ptr<TopicPublishInfo> topicPublishInfo(weak_topicPublishInfo.lock());
if (topicPublishInfo) {
SendResult sendResult;
if (times == 1) {
// always send to selected MQ firstly, evenif bActiveMQ was setted to true
mq = pSelector->select(topicPublishInfo->getMessageQueueList(), msg, pArg);
lastmq = mq;
} else {
LOG_INFO("sendAutoRetrySelectImpl with times:%d", times);
std::vector<MQMessageQueue> mqs(topicPublishInfo->getMessageQueueList());
for (size_t i = 0; i < mqs.size(); i++) {
if (mqs[i] == lastmq)
mq_index = i;
}
if (bActiveMQ)
mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
else
mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
lastmq = mq;
if (mq.getQueueId() == -1) {
// THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
// invalide", -1);
continue;
}
}
try {
LOG_DEBUG("send to broker:%s", mq.toString().c_str());
sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
switch (communicationMode) {
case ComMode_ASYNC:
return sendResult;
case ComMode_ONEWAY:
return sendResult;
case ComMode_SYNC:
if (sendResult.getSendStatus() != SEND_OK) {
if (bActiveMQ) {
topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
}
continue;
}
return sendResult;
default:
break;
}
} catch (...) {
LOG_ERROR("send failed of times:%d,mq:%s", times, mq.toString().c_str());
if (bActiveMQ) {
topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
}
continue;
}
} // end of for
LOG_WARN("Retry many times, still failed");
}
THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
}