int main()

in example/PullConsumer.cpp [44:120]


int main(int argc, char* argv[]) {
  RocketmqSendAndConsumerArgs info;
  if (!ParseArgs(argc, argv, &info)) {
    exit(-1);
  }
  PrintRocketmqSendAndConsumerArgs(info);
  DefaultMQPullConsumer consumer("please_rename_unique_group_name");
  consumer.setNamesrvAddr(info.namesrv);
  consumer.setNamesrvDomain(info.namesrv_domain);
  consumer.setGroupName(info.groupname);
  consumer.setInstanceName(info.groupname);
  consumer.registerMessageQueueListener(info.topic, NULL);
  consumer.start();
  std::vector<MQMessageQueue> mqs;

  try {
    consumer.fetchSubscribeMessageQueues(info.topic, mqs);
    auto iter = mqs.begin();
    for (; iter != mqs.end(); ++iter) {
      std::cout << "mq:" << (*iter).toString() << endl;
    }
  } catch (MQException& e) {
    std::cout << e << endl;
  }

  auto start = std::chrono::system_clock::now();
  auto iter = mqs.begin();
  for (; iter != mqs.end(); ++iter) {
    MQMessageQueue mq = (*iter);
    // if cluster model
    // putMessageQueueOffset(mq, g_consumer.fetchConsumeOffset(mq,true));
    // if broadcast model
    // putMessageQueueOffset(mq, your last consume offset);

    bool noNewMsg = false;
    do {
      try {
        PullResult result = consumer.pull(mq, "*", getMessageQueueOffset(mq), 32);
        g_msgCount += result.msgFoundList.size();
        std::cout << result.msgFoundList.size() << std::endl;
        // if pull request timeout or received NULL response, pullStatus will be
        // setted to BROKER_TIMEOUT,
        // And nextBeginOffset/minOffset/MaxOffset will be setted to 0
        if (result.pullStatus != BROKER_TIMEOUT) {
          putMessageQueueOffset(mq, result.nextBeginOffset);
          PrintPullResult(&result);
        } else {
          cout << "broker timeout occur" << endl;
        }
        switch (result.pullStatus) {
          case FOUND:
          case NO_MATCHED_MSG:
          case OFFSET_ILLEGAL:
          case BROKER_TIMEOUT:
            break;
          case NO_NEW_MSG:
            noNewMsg = true;
            break;
          default:
            break;
        }
      } catch (MQClientException& e) {
        std::cout << e << std::endl;
      }
    } while (!noNewMsg);
  }

  auto end = std::chrono::system_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

  std::cout << "msg count: " << g_msgCount.load() << "\n";
  std::cout << "per msg time: " << duration.count() / (double)g_msgCount.load() << "ms \n"
            << "========================finished==============================\n";

  consumer.shutdown();
  return 0;
}