in src/consumer/DefaultMQPushConsumerImpl.cpp [632:789]
void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullRequest) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_ERROR("Pull request is released, return");
return;
}
if (request->isDropped()) {
LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
// request->removePullMsgEvent();
return;
}
MQMessageQueue& messageQueue = request->m_messageQueue;
if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
producePullMsgTaskLater(request, 1000);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
LOG_INFO("Sync Pull request for %s has Cached with %d Messages and The Max size is %d, Sleep 1s.",
(request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize);
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
// Retry 1s,
producePullMsgTaskLater(request, 1000);
return;
}
bool commitOffsetEnable = false;
int64 commitOffsetValue = 0;
if (CLUSTERING == getMessageModel()) {
commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
string subExpression;
SubscriptionData* pSdata = m_pRebalance->getSubscriptionData(messageQueue.getTopic());
if (pSdata == NULL) {
LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.",
(request->m_messageQueue).toString().c_str());
producePullMsgTaskLater(request, 1000);
return;
}
subExpression = pSdata->getSubString();
int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
false, // suspend
!subExpression.empty(), // subscription
false); // class filter
if (request->isDropped()) {
LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str());
return;
}
try {
uint64 startTimeStamp = UtilAll::currentTimeMillis();
request->setLastPullTimestamp(startTimeStamp);
unique_ptr<PullResult> result(m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
subExpression, // 2
pSdata->getSubVersion(), // 3
request->getNextOffset(), // 4
32, // 5
sysFlag, // 6
commitOffsetValue, // 7
1000 * 15, // 8
1000 * 30, // 9
ComMode_SYNC, // 10
NULL, getSessionCredentials()));
PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata);
switch (pullResult.pullStatus) {
case FOUND: {
uint64 pullRT = UtilAll::currentTimeMillis() - startTimeStamp;
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(messageQueue.getTopic(), getGroupName(),
pullRT);
if (request->isDropped()) {
LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
messageQueue.toString().c_str());
break;
}
// and this request is dropped, and then received pulled msgs.
request->setNextOffset(pullResult.nextBeginOffset);
request->putMessage(pullResult.msgFoundList);
if (!pullResult.msgFoundList.empty()) {
StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(messageQueue.getTopic(), getGroupName(),
pullResult.msgFoundList.size());
}
m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList);
producePullMsgTask(request);
LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
break;
}
case NO_NEW_MSG: {
if (request->isDropped()) {
LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
messageQueue.toString().c_str());
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(), pullResult.nextBeginOffset);
break;
}
case NO_MATCHED_MSG: {
if (request->isDropped()) {
LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
messageQueue.toString().c_str());
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.nextBeginOffset);
break;
}
case OFFSET_ILLEGAL: {
if (request->isDropped()) {
LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
messageQueue.toString().c_str());
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
producePullMsgTask(request);
LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.nextBeginOffset);
break;
}
case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
// will not returns this status, so this case
// could not be entered.
LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
request->setNextOffset(pullResult.nextBeginOffset);
producePullMsgTask(request);
break;
}
}
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
LOG_WARN("Pull %s occur exception, restart 1s later.", messageQueue.toString().c_str());
producePullMsgTaskLater(request, 1000);
}
}