MQMessageQueue selectOneMessageQueue()

in src/producer/TopicPublishInfo.h [125:173]


  MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
    boost::lock_guard<boost::mutex> lock(m_queuelock);

    if (m_queues.size() > 0) {
      LOG_DEBUG("selectOneMessageQueue Enter, queue size:" SIZET_FMT "", m_queues.size());
      unsigned int pos = 0;
      if (mq_index >= 0) {
        pos = mq_index % m_queues.size();
      } else {
        LOG_ERROR("mq_index is negative");
        return MQMessageQueue();
      }
      if (!lastmq.getBrokerName().empty()) {
        for (size_t i = 0; i < m_queues.size(); i++) {
          if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
            m_sendWhichQueue.store(0, boost::memory_order_release);
          }

          if (pos >= m_queues.size())
            pos = pos % m_queues.size();

          ++m_sendWhichQueue;
          MQMessageQueue mq = m_queues.at(pos);
          LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d",
                    m_sendWhichQueue.load(boost::memory_order_acquire), pos);
          if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) {
            mq_index = pos;
            return mq;
          }
          ++pos;
        }
        LOG_ERROR("could not find property mq");
        return MQMessageQueue();
      } else {
        if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
          m_sendWhichQueue.store(0, boost::memory_order_release);
        }

        ++m_sendWhichQueue;
        LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
                  m_sendWhichQueue.load(boost::memory_order_acquire), pos);
        mq_index = pos;
        return m_queues.at(pos);
      }
    } else {
      LOG_ERROR("m_queues empty");
      return MQMessageQueue();
    }
  }