in src/consumer/DefaultMQPushConsumerImpl.cpp [829:923]
void DefaultMQPushConsumerImpl::pullMessageAsync(boost::weak_ptr<PullRequest> pullRequest) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_ERROR("Pull request is released, return");
return;
}
if (request->isDropped()) {
LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
return;
}
MQMessageQueue& messageQueue = request->m_messageQueue;
if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
// Retry later.
producePullMsgTaskLater(request, 1000);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 3s.",
(request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize);
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
// Retry 3s,
producePullMsgTaskLater(request, 3000);
return;
}
bool commitOffsetEnable = false;
int64 commitOffsetValue = 0;
if (CLUSTERING == getMessageModel()) {
commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
string subExpression;
SubscriptionData* pSdata = (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
if (pSdata == NULL) {
LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.",
(request->m_messageQueue).toString().c_str());
// Subscribe data error, retry later.
producePullMsgTaskLater(request, 1000);
return;
}
subExpression = pSdata->getSubString();
int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
true, // suspend
!subExpression.empty(), // subscription
false); // class filter
AsyncArg arg;
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
if (request->isDropped()) {
LOG_WARN("Pull request is set as dropped with mq:%s, return", request->m_messageQueue.toString().c_str());
return;
}
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
AsyncPullCallback* pullCallback = getAsyncPullCallBack(request, messageQueue);
if (pullCallback == NULL) {
LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has been released.",
request->m_messageQueue.toString().c_str());
return;
}
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
subExpression, // 2
pSdata->getSubVersion(), // 3
request->getNextOffset(), // 4
32, // 5
sysFlag, // 6
commitOffsetValue, // 7
1000 * 15, // 8
m_asyncPullTimeout, // 9
ComMode_ASYNC, // 10
pullCallback, // 11
getSessionCredentials(), // 12
&arg); // 13
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
if (request->isDropped()) {
LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str());
return;
}
LOG_INFO("Pull %s occur exception, restart 1s later.", (request->m_messageQueue).toString().c_str());
producePullMsgTaskLater(request, 1000);
}
}