void MQClientFactory::resetOffset()

in src/MQClientFactory.cpp [1125:1180]


void MQClientFactory::resetOffset(const string& group,
                                  const string& topic,
                                  const map<MQMessageQueue, int64>& offsetTable) {
  MQConsumer* pConsumer = selectConsumer(group);
  if (pConsumer) {
    map<MQMessageQueue, int64>::const_iterator it = offsetTable.begin();

    for (; it != offsetTable.end(); ++it) {
      MQMessageQueue mq = it->first;
      boost::weak_ptr<PullRequest> pullRequest = pConsumer->getRebalance()->getPullRequest(mq);
      boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
      // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
      if (pullreq) {
        pullreq->setDropped(true);
        LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
        pullreq->clearAllMsgs();
        pullreq->updateQueueMaxOffset(it->second);
      } else {
        LOG_ERROR("no corresponding pullRequest found for topic:%s", topic.c_str());
      }
    }

    for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
      MQMessageQueue mq = it->first;
      if (topic == mq.getTopic()) {
        LOG_INFO("offset sets to:%lld", it->second);
        pConsumer->updateConsumeOffset(mq, it->second);
      }
    }
    pConsumer->persistConsumerOffsetByResetOffset();

    boost::this_thread::sleep_for(boost::chrono::milliseconds(10));

    for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
      MQMessageQueue mq = it->first;
      if (topic == mq.getTopic()) {
        LOG_DEBUG("resetOffset sets to:%lld for mq:%s", it->second, mq.toString().c_str());
        pConsumer->updateConsumeOffset(mq, it->second);
      }
    }
    pConsumer->persistConsumerOffsetByResetOffset();

    for (it = offsetTable.begin(); it != offsetTable.end(); ++it) {
      MQMessageQueue mq = it->first;
      if (topic == mq.getTopic()) {
        pConsumer->removeConsumeOffset(mq);
      }
    }

    // do call pConsumer->doRebalance directly here, as it is conflict with
    // timerCB_doRebalance;
    doRebalanceByConsumerGroup(pConsumer->getGroupName());
  } else {
    LOG_ERROR("no corresponding consumer found for group:%s", group.c_str());
  }
}