void ConsumeTask::process()

in cpp/source/rocketmq/ConsumeTask.cpp [126:231]


void ConsumeTask::process() {
  auto svc = service_.lock();
  if (!svc) {
    SPDLOG_DEBUG("ConsumeMessageService has destructed");
    return;
  }

  if (messages_.empty()) {
    SPDLOG_DEBUG("No more messages to process");
    return;
  }

  std::shared_ptr<PushConsumerImpl> consumer = svc->consumer().lock();

  auto self = shared_from_this();

  switch (next_step_) {
    case NextStep::Consume: {
      const auto& listener = svc->listener();
      auto it = messages_.begin();
      SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
      svc->preHandle(**it);

      // Collect metrics of await_time
      auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
      opencensus::stats::Record(
          {{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
          {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}, {Tag::consumerGroupTag(), consumer->groupName()}});

      std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();

      // Invoke user-defined-callback
      auto result = listener(**it);

      // Collect metrics of process_time
      auto duration = std::chrono::steady_clock::now() - start;
      switch (result) {
        case ConsumeResult::SUCCESS: {
          opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
                                    {
                                        {Tag::topicTag(), (*it)->topic()},
                                        {Tag::clientIdTag(), consumer->config().client_id},
                                        {Tag::invocationStatusTag(), "success"},
                                        {Tag::consumerGroupTag(), consumer->groupName()}
                                    });
          break;
        }
        case ConsumeResult::FAILURE: {
          opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
                                    {
                                        {Tag::topicTag(), (*it)->topic()},
                                        {Tag::clientIdTag(), consumer->config().client_id},
                                        {Tag::invocationStatusTag(), "failure"},
                                        {Tag::consumerGroupTag(), consumer->groupName()}
                                    });
          break;
        }
      }

      svc->postHandle(**it, result);

      switch (result) {
        case ConsumeResult::SUCCESS: {
          auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
          svc->ack(**it, callback);
          break;
        }
        case ConsumeResult::FAILURE: {
          if (fifo_) {
            next_step_ = NextStep::Consume;
            // Increase delivery attempts.
            auto raw = const_cast<Message*>((*it).get());
            raw->mutableExtension().delivery_attempt++;
            schedule();
          } else {
            // For standard way of processing, Nack to server.
            auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
            svc->nack(**it, callback);
          }
          break;
        }
      }
      break;
    }

    case NextStep::Ack: {
      assert(!messages_.empty());
      auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
      svc->ack(*messages_[0], callback);
      break;
    }

    case NextStep::Nack: {
      auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
      svc->nack(*messages_[0], callback);
      break;
    }

    case NextStep::Forward: {
      assert(!messages_.empty());
      auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
      svc->forward(*messages_[0], callback);
      break;
    }
  }
}