SendResult DefaultMQProducerImpl::sendDefaultImpl()

in src/producer/DefaultMQProducerImpl.cpp [373:435]


SendResult DefaultMQProducerImpl::sendDefaultImpl(MQMessage& msg,
                                                  int communicationMode,
                                                  SendCallback* pSendCallback,
                                                  bool bActiveMQ) {
  MQMessageQueue lastmq;
  int mq_index = 0;
  for (int times = 1; times <= m_retryTimes; times++) {
    boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
        getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), getSessionCredentials()));
    boost::shared_ptr<TopicPublishInfo> topicPublishInfo(weak_topicPublishInfo.lock());
    if (topicPublishInfo) {
      if (times == 1) {
        mq_index = topicPublishInfo->getWhichQueue();
      } else {
        mq_index++;
      }

      SendResult sendResult;
      MQMessageQueue mq;
      if (bActiveMQ)
        mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
      else
        mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);

      lastmq = mq;
      if (mq.getQueueId() == -1) {
        // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
        // invalide", -1);
        continue;
      }

      try {
        LOG_DEBUG("send to mq:%s", mq.toString().data());
        sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
        switch (communicationMode) {
          case ComMode_ASYNC:
            return sendResult;
          case ComMode_ONEWAY:
            return sendResult;
          case ComMode_SYNC:
            if (sendResult.getSendStatus() != SEND_OK) {
              if (bActiveMQ) {
                topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
              }
              continue;
            }
            return sendResult;
          default:
            break;
        }
      } catch (...) {
        LOG_ERROR("send failed of times:%d,brokerName:%s", times, mq.getBrokerName().c_str());
        if (bActiveMQ) {
          topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
        }
        continue;
      }
    }  // end of for
    LOG_WARN("Retry many times, still failed");
  }
  string info = "No route info of this topic: " + msg.getTopic();
  THROW_MQEXCEPTION(MQClientException, info, -1);
}