in src/MQClientFactory.cpp [1125:1180]
void MQClientFactory::resetOffset(const string& group,
const string& topic,
const map<MQMessageQueue, int64>& offsetTable) {
MQConsumer* pConsumer = selectConsumer(group);
if (pConsumer) {
map<MQMessageQueue, int64>::const_iterator it = offsetTable.begin();
for (; it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
boost::weak_ptr<PullRequest> pullRequest = pConsumer->getRebalance()->getPullRequest(mq);
boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
// PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDropped(true);
LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
LOG_ERROR("no corresponding pullRequest found for topic:%s", topic.c_str());
}
}
for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
if (topic == mq.getTopic()) {
LOG_INFO("offset sets to:%lld", it->second);
pConsumer->updateConsumeOffset(mq, it->second);
}
}
pConsumer->persistConsumerOffsetByResetOffset();
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));
for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
if (topic == mq.getTopic()) {
LOG_DEBUG("resetOffset sets to:%lld for mq:%s", it->second, mq.toString().c_str());
pConsumer->updateConsumeOffset(mq, it->second);
}
}
pConsumer->persistConsumerOffsetByResetOffset();
for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
if (topic == mq.getTopic()) {
pConsumer->removeConsumeOffset(mq);
}
}
// do call pConsumer->doRebalance directly here, as it is conflict with
// timerCB_doRebalance;
doRebalanceByConsumerGroup(pConsumer->getGroupName());
} else {
LOG_ERROR("no corresponding consumer found for group:%s", group.c_str());
}
}