in src/consumer/Rebalance.cpp [55:151]
void Rebalance::doRebalance() {
LOG_DEBUG("start doRebalance");
try {
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it) {
string topic = (it->first);
LOG_DEBUG("current topic is:%s", topic.c_str());
//<!topic -> mqs
vector<MQMessageQueue> mqAll;
if (!getTopicSubscribeInfo(topic, mqAll)) {
continue;
}
if (mqAll.empty()) {
if (!UtilAll::startsWith_retry(topic)) {
std::string msg("#doRebalance. mqAll for topic:");
msg.append(topic);
msg.append(" is empty");
LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str());
// to check, return error or throw exception
THROW_MQEXCEPTION(MQClientException, msg, -1);
}
}
//<!msg model;
switch (m_pConsumer->getMessageModel()) {
case BROADCASTING: {
bool changed = updateRequestTableInRebalance(topic, mqAll);
if (changed) {
messageQueueChanged(topic, mqAll, mqAll);
}
break;
}
case CLUSTERING: {
vector<string> cidAll;
m_pClientFactory->findConsumerIds(topic, m_pConsumer->getGroupName(), cidAll,
m_pConsumer->getSessionCredentials());
if (cidAll.empty()) {
LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s, Topic: %s",
m_pConsumer->getGroupName().c_str(), topic.c_str());
// Should skip this round of re-balance immediately if consumer ID set is empty.
THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is empty", -1);
}
// log
for (int i = 0; i < (int)cidAll.size(); ++i) {
LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str());
}
//<! sort;
sort(mqAll.begin(), mqAll.end());
sort(cidAll.begin(), cidAll.end());
//<! allocate;
vector<MQMessageQueue> allocateResult;
try {
m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll, cidAll, allocateResult);
} catch (MQException& e) {
std::string errMsg("Allocate message queue for ConsumerGroup[");
errMsg.append(m_pConsumer->getGroupName());
errMsg.append("],Topic[");
errMsg.append(topic);
errMsg.append("] failed. ");
LOG_ERROR("%s", errMsg.c_str());
THROW_MQEXCEPTION(MQClientException, errMsg, -1);
}
// log
for (int i = 0; i < (int)allocateResult.size(); ++i) {
LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str());
}
//<!update local;
bool changed = updateRequestTableInRebalance(topic, allocateResult);
if (changed) {
std::stringstream ss;
ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName() << ", Topic: " << topic
<< ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n"
<< "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << cidAll.size()
<< ", Allocated Queues are: \n";
for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size(); ++i) {
ss << allocateResult[i].toString() << "\n";
}
// Log allocation result.
LOG_INFO("%s", ss.str().c_str());
messageQueueChanged(topic, mqAll, allocateResult);
break;
}
}
default:
break;
}
}
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
}
}