SendResult DefaultMQProducerImpl::sendAutoRetrySelectImpl()

in src/producer/DefaultMQProducerImpl.cpp [540:620]


SendResult DefaultMQProducerImpl::sendAutoRetrySelectImpl(MQMessage& msg,
                                                          MessageQueueSelector* pSelector,
                                                          void* pArg,
                                                          int communicationMode,
                                                          SendCallback* pSendCallback,
                                                          int autoRetryTimes,
                                                          bool bActiveMQ) {
  Validators::checkMessage(msg, getMaxMessageSize());

  MQMessageQueue lastmq;
  MQMessageQueue mq;
  int mq_index = 0;
  bool send_failed = false;
  string failed_detail;
  for (int times = 1; times <= autoRetryTimes + 1; times++) {
    boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
        getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), getSessionCredentials()));
    boost::shared_ptr<TopicPublishInfo> topicPublishInfo(weak_topicPublishInfo.lock());
    if (topicPublishInfo) {
      SendResult sendResult;
      if (times == 1) {
        // always send to selected MQ firstly, evenif bActiveMQ was setted to true
        mq = pSelector->select(topicPublishInfo->getMessageQueueList(), msg, pArg);
        lastmq = mq;
      } else {
        LOG_INFO("sendAutoRetrySelectImpl with times:%d", times);
        std::vector<MQMessageQueue> mqs(topicPublishInfo->getMessageQueueList());
        for (size_t i = 0; i < mqs.size(); i++) {
          if (mqs[i] == lastmq)
            mq_index = i;
        }
        if (bActiveMQ)
          mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
        else
          mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
        lastmq = mq;
        if (mq.getQueueId() == -1) {
          // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
          // invalide", -1);
          continue;
        }
      }

      try {
        LOG_DEBUG("send to broker:%s", mq.toString().c_str());
        sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
        switch (communicationMode) {
          case ComMode_ASYNC:
            return sendResult;
          case ComMode_ONEWAY:
            return sendResult;
          case ComMode_SYNC:
            if (sendResult.getSendStatus() != SEND_OK) {
              if (bActiveMQ) {
                topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
              }
              continue;
            }
            return sendResult;
          default:
            break;
        }
      } catch (std::exception& e) {
        send_failed = true;
        failed_detail = e.what();
        LOG_ERROR("send failed of times:%d,mq:%s,details:%s", times, mq.toString().c_str(), e.what());
        if (bActiveMQ) {
          topicPublishInfo->updateNonServiceMessageQueue(mq, getSendMsgTimeout());
        }
        continue;
      } catch (...) {
        LOG_ERROR("An unknown exception occurred,send failed of times:%d,mq:%s", times, mq.toString().c_str());
      }
    }  // end of for
    LOG_WARN("Retry many times, still failed");
    if (send_failed) {
      THROW_MQEXCEPTION(MQClientException, failed_detail, -1);
    }
  }
  THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
}