in example/PullConsumeMessage.c [39:95]
int main(int argc, char* argv[]) {
int i = 0, j = 0;
int ret = 0;
int size = 0;
CMessageQueue* mqs = NULL;
printf("PullConsumer Initializing....\n");
CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
StartPullConsumer(consumer);
printf("Pull Consumer Start...\n");
for (i = 1; i <= 5; i++) {
printf("FetchSubscriptionMessageQueues : %d times\n", i);
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
if (ret != OK) {
printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
}
printf("Get MQ Size:%d\n", size);
for (j = 0; j < size; j++) {
int noNewMsg = 0;
long long tmpoffset = 0;
printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
do {
int k = 0;
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
tmpoffset = pullResult.nextBeginOffset;
}
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
switch (pullResult.pullStatus) {
case E_FOUND:
printf("Get Message Size:%d\n", pullResult.size);
for (k = 0; k < pullResult.size; ++k) {
printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),
GetMessageBody(pullResult.msgFoundList[k]));
}
break;
case E_NO_MATCHED_MSG:
noNewMsg = 1;
break;
default:
noNewMsg = 0;
}
ReleasePullResult(pullResult);
thread_sleep(100);
} while (noNewMsg == 0);
thread_sleep(1000);
}
thread_sleep(2000);
ReleaseSubscriptionMessageQueue(mqs);
}
thread_sleep(5000);
ShutdownPullConsumer(consumer);
DestroyPullConsumer(consumer);
printf("PullConsumer Shutdown!\n");
return 0;
}