MQMessageQueue selectOneActiveMessageQueue()

in src/producer/TopicPublishInfo.h [175:248]


  MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
    boost::lock_guard<boost::mutex> lock(m_queuelock);

    if (m_queues.size() > 0) {
      unsigned int pos = 0;
      if (mq_index >= 0) {
        pos = mq_index % m_queues.size();
      } else {
        LOG_ERROR("mq_index is negative");
        return MQMessageQueue();
      }
      if (!lastmq.getBrokerName().empty()) {
        for (size_t i = 0; i < m_queues.size(); i++) {
          if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
            m_sendWhichQueue.store(0, boost::memory_order_release);
          }

          if (pos >= m_queues.size())
            pos = pos % m_queues.size();

          ++m_sendWhichQueue;
          MQMessageQueue mq = m_queues.at(pos);
          string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
          if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) &&
              (m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) {
            mq_index = pos;
            return mq;
          }
          ++pos;
        }

        for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
             ++it) {  // if no MQMessageQueue(except lastmq) in
                      // m_onSerivceQueues, search m_nonSerivceQueues
          if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
            return it->second;
        }
        LOG_ERROR("can not find property mq");
        return MQMessageQueue();
      } else {
        for (size_t i = 0; i < m_queues.size(); i++) {
          if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
            m_sendWhichQueue.store(0, boost::memory_order_release);
          }
          if (pos >= m_queues.size())
            pos = pos % m_queues.size();

          ++m_sendWhichQueue;
          LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
                    m_sendWhichQueue.load(boost::memory_order_acquire), pos);
          mq_index = pos;
          MQMessageQueue mq = m_queues.at(pos);
          string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
          if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
            return mq;
          } else {
            ++pos;
          }
        }

        for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
             ++it) {  // if no MQMessageQueue(except lastmq) in
                      // m_onSerivceQueues, search m_nonSerivceQueues
          if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
            return it->second;
        }
        LOG_ERROR("can not find property mq");
        return MQMessageQueue();
      }
    } else {
      LOG_ERROR("m_queues empty");
      return MQMessageQueue();
    }
  }