in src/consumer/DefaultMQPushConsumerImpl.cpp [46:165]
virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest) {
boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
if (!pullRequest) {
LOG_WARN("Pull request for[%s] has been released", mq.toString().c_str());
return;
}
if (m_bShutdown) {
LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str());
return;
}
if (pullRequest->isDropped()) {
LOG_INFO("Pull request for queue[%s] has been set as dropped. Will NOT pull this queue any more",
pullRequest->m_messageQueue.toString().c_str());
return;
}
switch (result.pullStatus) {
case FOUND: {
if (pullRequest->isDropped()) {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
uint64 pullRT = UtilAll::currentTimeMillis() - pullRequest->getLastPullTimestamp();
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(
pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), pullRT);
pullRequest->setNextOffset(result.nextBeginOffset);
pullRequest->putMessage(result.msgFoundList);
if (!result.msgFoundList.empty()) {
StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(
pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), result.msgFoundList.size());
}
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList);
if (bProducePullRequest) {
m_callbackOwner->producePullMsgTask(pullRequest);
} else {
LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
}
LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
(pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), result.nextBeginOffset);
break;
}
case NO_NEW_MSG: {
if (pullRequest->isDropped()) {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
pullRequest->setNextOffset(result.nextBeginOffset);
if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
m_callbackOwner->producePullMsgTask(pullRequest);
} else {
LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
}
LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
result.nextBeginOffset);
break;
}
case NO_MATCHED_MSG: {
if (pullRequest->isDropped()) {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
pullRequest->setNextOffset(result.nextBeginOffset);
if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
m_callbackOwner->producePullMsgTask(pullRequest);
} else {
LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
}
LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
result.nextBeginOffset);
break;
}
case OFFSET_ILLEGAL: {
if (pullRequest->isDropped()) {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
pullRequest->setNextOffset(result.nextBeginOffset);
if (bProducePullRequest) {
m_callbackOwner->producePullMsgTask(pullRequest);
} else {
LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
}
LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
result.nextBeginOffset);
break;
}
case BROKER_TIMEOUT: {
if (pullRequest->isDropped()) {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
pullRequest->setNextOffset(result.nextBeginOffset);
if (bProducePullRequest) {
m_callbackOwner->producePullMsgTask(pullRequest);
} else {
LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
}
break;
}
}
}