in src/consumer/ConsumeMessageConcurrentlyService.cpp [143:324]
void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
if (request->isDropped()) {
LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}
if (msgs.empty()) {
LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
return;
}
ConsumeMessageContext consumeMessageContext;
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
std::string groupName = pConsumer->getGroupName();
if (pConsumer) {
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
consumeMessageContext.setConsumerGroup(groupName);
consumeMessageContext.setMessageQueue(request->m_messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
}
}
ConsumeStatus status = CONSUME_SUCCESS;
if (m_pMessageListener != NULL) {
resetRetryTopic(msgs);
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
LOG_DEBUG("=====Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", msgs[0].getTopic().c_str(),
msgs[0].getMsgId().c_str(), msgs[0].getBody().c_str(), msgs[0].getReconsumeTimes());
if (m_pConsumer->isUseNameSpaceMode()) {
MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
}
if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
// For open trace message, consume message one by one.
for (size_t i = 0; i < msgs.size(); ++i) {
LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]",
msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(),
msgs[i].getReconsumeTimes());
std::vector<MQMessageExt> msgInner;
msgInner.push_back(msgs[i]);
if (status != CONSUME_SUCCESS) {
// all the Messages behind should be set to failed.
status = RECONSUME_LATER;
consumeMessageContext.setMsgIndex(i);
consumeMessageContext.setStatus("RECONSUME_LATER");
consumeMessageContext.setSuccess(false);
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
continue;
}
uint64 startTimeStamp = UtilAll::currentTimeMillis();
try {
status = m_pMessageListener->consumeMessage(msgInner);
} catch (...) {
status = RECONSUME_LATER;
LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
}
uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
groupName, consumerRT);
consumeMessageContext.setMsgIndex(i); // indicate message position,not support batch consumer
if (status == CONSUME_SUCCESS) {
consumeMessageContext.setStatus("CONSUME_SUCCESS");
consumeMessageContext.setSuccess(true);
} else {
status = RECONSUME_LATER;
consumeMessageContext.setStatus("RECONSUME_LATER");
consumeMessageContext.setSuccess(false);
}
pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
}
} else {
uint64 startTimeStamp = UtilAll::currentTimeMillis();
try {
status = m_pMessageListener->consumeMessage(msgs);
} catch (...) {
status = RECONSUME_LATER;
LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
}
uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
groupName, consumerRT, msgs.size());
}
}
int ackIndex = -1;
switch (status) {
case CONSUME_SUCCESS:
ackIndex = msgs.size();
break;
case RECONSUME_LATER:
ackIndex = -1;
break;
default:
break;
}
std::vector<MQMessageExt> localRetryMsgs;
switch (m_pConsumer->getMessageModel()) {
case BROADCASTING: {
// Note: broadcasting reconsume should do by application, as it has big
// affect to broker cluster
if (ackIndex != (int)msgs.size())
LOG_WARN("BROADCASTING, the message consume failed, drop it:%s", (request->m_messageQueue).toString().c_str());
break;
}
case CLUSTERING: {
// status consumer tps
if (ackIndex == -1) {
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
request->m_messageQueue.getTopic(), groupName, msgs.size());
} else {
StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
groupName, msgs.size());
}
// send back msg to broker;
for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
string brokerName = request->m_messageQueue.getBrokerName();
if (m_pConsumer->isUseNameSpaceMode()) {
MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace());
}
if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
localRetryMsgs.push_back(msgs[i]);
}
}
}
break;
}
default:
break;
}
if (!localRetryMsgs.empty()) {
LOG_ERROR("Client side re-consume launched due to both message consuming and SDK send-back retry failure");
for (std::vector<MQMessageExt>::iterator itOrigin = msgs.begin(); itOrigin != msgs.end();) {
bool remove = false;
for (std::vector<MQMessageExt>::iterator itRetry = localRetryMsgs.begin(); itRetry != localRetryMsgs.end();
itRetry++) {
if (itRetry->getQueueOffset() == itOrigin->getQueueOffset()) {
remove = true;
break;
}
}
if (remove) {
itOrigin = msgs.erase(itOrigin);
} else {
itOrigin++;
}
}
}
// update offset
int64 offset = request->removeMessage(msgs);
if (offset >= 0) {
m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
} else {
LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
(request->m_messageQueue).toString().c_str());
}
if (!localRetryMsgs.empty()) {
// submitConsumeRequest(request, localTryMsgs);
LOG_INFO("Send [%d ]messages back to mq:%s failed, call reconsume again after 1s.", localRetryMsgs.size(),
(request->m_messageQueue).toString().c_str());
submitConsumeRequestLater(request, localRetryMsgs, 1000);
}
} // namespace rocketmq