in src/producer/TopicPublishInfo.h [175:248]
MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
if (m_queues.size() > 0) {
unsigned int pos = 0;
if (mq_index >= 0) {
pos = mq_index % m_queues.size();
} else {
LOG_ERROR("mq_index is negative");
return MQMessageQueue();
}
if (!lastmq.getBrokerName().empty()) {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
MQMessageQueue mq = m_queues.at(pos);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) &&
(m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) {
mq_index = pos;
return mq;
}
++pos;
}
for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
++it) { // if no MQMessageQueue(except lastmq) in
// m_onSerivceQueues, search m_nonSerivceQueues
if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
return it->second;
}
LOG_ERROR("can not find property mq");
return MQMessageQueue();
} else {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
mq_index = pos;
MQMessageQueue mq = m_queues.at(pos);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
return mq;
} else {
++pos;
}
}
for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
++it) { // if no MQMessageQueue(except lastmq) in
// m_onSerivceQueues, search m_nonSerivceQueues
if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
return it->second;
}
LOG_ERROR("can not find property mq");
return MQMessageQueue();
}
} else {
LOG_ERROR("m_queues empty");
return MQMessageQueue();
}
}