in src/consumer/ConsumeMessageOrderlyService.cpp [139:259]
void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
bool bGetMutex = false;
boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
if (!lock.owns_lock()) {
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1))) {
LOG_ERROR("ConsumeRequest of:%s get timed_mutex timeout", request->m_messageQueue.toString().c_str());
return;
} else {
bGetMutex = true;
}
} else {
bGetMutex = true;
}
if (!bGetMutex) {
// LOG_INFO("pullrequest of mq:%s consume inprogress",
// request->m_messageQueue.toString().c_str());
return;
}
if (!request || request->isDropped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}
if (m_pMessageListener) {
if ((request->isLocked() && !request->isLockExpired()) || m_pConsumer->getMessageModel() == BROADCASTING) {
// DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
uint64_t beginTime = UtilAll::currentTimeMillis();
bool continueConsume = true;
while (continueConsume) {
if ((UtilAll::currentTimeMillis() - beginTime) > m_MaxTimeConsumeContinuously) {
LOG_INFO("Continuely consume %s more than 60s, consume it 1s later",
request->m_messageQueue.toString().c_str());
tryLockLaterAndReconsumeDelay(request, false, 1000);
break;
}
vector<MQMessageExt> msgs;
// request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
request->takeMessages(msgs, 1);
if (!msgs.empty()) {
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
if (m_pConsumer->isUseNameSpaceMode()) {
MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
}
ConsumeMessageContext consumeMessageContext;
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
std::string groupName = pConsumer->getGroupName();
if (pConsumer) {
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
consumeMessageContext.setMessageQueue(request->m_messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
}
}
uint64 startTimeStamp = UtilAll::currentTimeMillis();
ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
groupName, consumerRT);
if (consumeStatus == RECONSUME_LATER) {
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
request->m_messageQueue.getTopic(), groupName, 1);
if (pConsumer) {
consumeMessageContext.setMsgIndex(0);
consumeMessageContext.setStatus("RECONSUME_LATER");
consumeMessageContext.setSuccess(false);
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
}
if (msgs[0].getReconsumeTimes() <= 15) {
msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
request->makeMessageToCosumeAgain(msgs);
continueConsume = false;
tryLockLaterAndReconsumeDelay(request, false, 1000);
} else {
// need change to reconsumer delay level and print log.
LOG_INFO("Local Consume failed [%d] times, change [%s] delay to 5s.", msgs[0].getReconsumeTimes(),
msgs[0].getMsgId().c_str());
msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
continueConsume = false;
request->makeMessageToCosumeAgain(msgs);
tryLockLaterAndReconsumeDelay(request, false, 5000);
}
} else {
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(
request->m_messageQueue.getTopic(), groupName, 1);
if (pConsumer) {
consumeMessageContext.setMsgIndex(0);
consumeMessageContext.setStatus("CONSUME_SUCCESS");
consumeMessageContext.setSuccess(true);
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
}
m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
}
} else {
continueConsume = false;
}
msgs.clear();
if (m_shutdownInprogress) {
LOG_INFO("shutdown inprogress, break the consuming");
return;
}
}
LOG_DEBUG("consume once exit of mq:%s", request->m_messageQueue.toString().c_str());
} else {
LOG_ERROR("message queue:%s was not locked", request->m_messageQueue.toString().c_str());
tryLockLaterAndReconsumeDelay(request, true, 1000);
}
}
}