in src/consumer/Rebalance.cpp [462:553]
bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter", topic.c_str());
// 1. Clear no in charge of
// 1. set dropped
// 2. clear local message
// 3. clear offset
// 4. remove request table
// 5. set flag for route changed
// 2. Check and clear dropped/invalid pullrequest(timeout and so on)
// 3. Add new mq in charge of
// 1. new pullrequest
// 2. init next pull offset
// 3. int offset
// 4. add request table
// 5. set flag for route changed
// 4. Start long pull for request
if (mqsSelf.empty()) {
LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
}
bool changed = false;
//<!remove none responsive mq
MQ2PULLREQ requestQueueTable(getPullRequestTable());
MQ2PULLREQ::iterator itDel = requestQueueTable.begin();
for (; itDel != requestQueueTable.end(); ++itDel) {
MQMessageQueue mqtemp = itDel->first;
if (mqtemp.getTopic().compare(topic) == 0) {
if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
// if not response , set to dropped
LOG_INFO("Drop mq:%s,because not responsive", mqtemp.toString().c_str());
itDel->second->setDropped(true);
// remove offset table to avoid offset backup
removeUnnecessaryMessageQueue(mqtemp);
itDel->second->clearAllMsgs();
removePullRequest(mqtemp);
changed = true;
} else if (itDel->second->isPullRequestExpired()) {
// if pull expired , set to dropped, eg: if add pull task error, the pull request will be expired.
LOG_INFO("Drop mq:%s according Pull timeout.", mqtemp.toString().c_str());
itDel->second->setDropped(true);
removeUnnecessaryMessageQueue(mqtemp);
itDel->second->clearAllMsgs();
removePullRequest(mqtemp);
changed = true;
}
}
}
//<!add check new mq added.
vector<boost::shared_ptr<PullRequest>> pullRequestsToAdd;
vector<MQMessageQueue>::iterator itAdd = mqsSelf.begin();
for (; itAdd != mqsSelf.end(); ++itAdd) {
if (isPullRequestExist(*itAdd)) {
// have check the expired pull request, re-add it.
continue;
}
boost::shared_ptr<PullRequest> pullRequest = boost::make_shared<PullRequest>(m_pConsumer->getGroupName());
pullRequest->m_messageQueue = *itAdd;
int64 nextOffset = computePullFromWhere(*itAdd);
if (nextOffset >= 0) {
pullRequest->setNextOffset(nextOffset);
pullRequest->setDropped(false);
changed = true;
addPullRequest(*itAdd, pullRequest);
pullRequestsToAdd.push_back(pullRequest);
LOG_INFO("Add mq:%s, request initial offset:%ld", (*itAdd).toString().c_str(), nextOffset);
} else {
LOG_WARN(
"Failed to add pull request for %s due to failure of querying consume offset, request initial offset:%ld",
(*itAdd).toString().c_str(), nextOffset);
}
}
for (vector<boost::shared_ptr<PullRequest>>::iterator itAdded = pullRequestsToAdd.begin();
itAdded != pullRequestsToAdd.end(); ++itAdded) {
LOG_INFO("Start to pull %s, offset:%ld, GroupName %s", (*itAdded)->m_messageQueue.toString().c_str(),
(*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str());
if (!m_pConsumer->producePullMsgTask(*itAdded)) {
LOG_WARN(
"Failed to producer pull message task for %s, Remove it from Request table and wait for next #Rebalance.",
(*itAdded)->m_messageQueue.toString().c_str());
// remove from request table, and wait for next rebalance.
(*itAdded)->setDropped(true);
removePullRequest((*itAdded)->m_messageQueue);
}
}
LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str());
return changed;
}