in cpp/source/rocketmq/ConsumeTask.cpp [126:231]
void ConsumeTask::process() {
auto svc = service_.lock();
if (!svc) {
SPDLOG_DEBUG("ConsumeMessageService has destructed");
return;
}
if (messages_.empty()) {
SPDLOG_DEBUG("No more messages to process");
return;
}
std::shared_ptr<PushConsumerImpl> consumer = svc->consumer().lock();
auto self = shared_from_this();
switch (next_step_) {
case NextStep::Consume: {
const auto& listener = svc->listener();
auto it = messages_.begin();
SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
svc->preHandle(**it);
// Collect metrics of await_time
auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
opencensus::stats::Record(
{{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
{{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}, {Tag::consumerGroupTag(), consumer->groupName()}});
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
// Invoke user-defined-callback
auto result = listener(**it);
// Collect metrics of process_time
auto duration = std::chrono::steady_clock::now() - start;
switch (result) {
case ConsumeResult::SUCCESS: {
opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), (*it)->topic()},
{Tag::clientIdTag(), consumer->config().client_id},
{Tag::invocationStatusTag(), "success"},
{Tag::consumerGroupTag(), consumer->groupName()}
});
break;
}
case ConsumeResult::FAILURE: {
opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), (*it)->topic()},
{Tag::clientIdTag(), consumer->config().client_id},
{Tag::invocationStatusTag(), "failure"},
{Tag::consumerGroupTag(), consumer->groupName()}
});
break;
}
}
svc->postHandle(**it, result);
switch (result) {
case ConsumeResult::SUCCESS: {
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(**it, callback);
break;
}
case ConsumeResult::FAILURE: {
if (fifo_) {
next_step_ = NextStep::Consume;
// Increase delivery attempts.
auto raw = const_cast<Message*>((*it).get());
raw->mutableExtension().delivery_attempt++;
schedule();
} else {
// For standard way of processing, Nack to server.
auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
svc->nack(**it, callback);
}
break;
}
}
break;
}
case NextStep::Ack: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(*messages_[0], callback);
break;
}
case NextStep::Nack: {
auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
svc->nack(*messages_[0], callback);
break;
}
case NextStep::Forward: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
svc->forward(*messages_[0], callback);
break;
}
}
}