in src/producer/TopicPublishInfo.h [125:173]
MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
if (m_queues.size() > 0) {
LOG_DEBUG("selectOneMessageQueue Enter, queue size:" SIZET_FMT "", m_queues.size());
unsigned int pos = 0;
if (mq_index >= 0) {
pos = mq_index % m_queues.size();
} else {
LOG_ERROR("mq_index is negative");
return MQMessageQueue();
}
if (!lastmq.getBrokerName().empty()) {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
MQMessageQueue mq = m_queues.at(pos);
LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) {
mq_index = pos;
return mq;
}
++pos;
}
LOG_ERROR("could not find property mq");
return MQMessageQueue();
} else {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
++m_sendWhichQueue;
LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
mq_index = pos;
return m_queues.at(pos);
}
} else {
LOG_ERROR("m_queues empty");
return MQMessageQueue();
}
}