void PushConsumerImpl::syncProcessQueue()

in cpp/source/rocketmq/PushConsumerImpl.cpp [284:336]


void PushConsumerImpl::syncProcessQueue(const std::string& topic,
                                        const std::shared_ptr<TopicAssignment>& topic_assignment,
                                        const FilterExpression& filter_expression) {
  const std::vector<rmq::Assignment>& assignment_list = topic_assignment->assignmentList();
  std::vector<rmq::MessageQueue> message_queue_list;
  message_queue_list.reserve(assignment_list.size());
  for (const auto& assignment : assignment_list) {
    message_queue_list.push_back(assignment.message_queue());
  }

  std::vector<rmq::MessageQueue> current;
  {
    absl::MutexLock lock(&process_queue_table_mtx_);
    for (auto it = process_queue_table_.begin(); it != process_queue_table_.end();) {
      if (topic != it->second->messageQueue().topic().name()) {
        it++;
        continue;
      }

      if (std::none_of(
              message_queue_list.cbegin(), message_queue_list.cend(),
              [&](const rmq::MessageQueue& message_queue) { return it->second->messageQueue() == message_queue; })) {
        SPDLOG_INFO("Stop receiving messages from {} as it is not assigned to current client according to latest "
                    "assignment result from load balancer",
                    simpleNameOf(it->second->messageQueue()));
        process_queue_table_.erase(it++);
      } else {
        if (!it->second || it->second->expired()) {
          SPDLOG_WARN("ProcessQueue={} is expired. Remove it for now.", it->first);
          process_queue_table_.erase(it++);
          continue;
        }
        current.push_back(it->second->messageQueue());
        it++;
      }
    }
  }

  for (const auto& message_queue : message_queue_list) {
    if (std::none_of(current.cbegin(), current.cend(),
                     [&](const rmq::MessageQueue& item) { return item == message_queue; })) {
      SPDLOG_DEBUG("Start to receive message from {} according to latest assignment info from load balancer",
                  simpleNameOf(message_queue));
      std::string attempt_id;
      if (!receiveMessage(message_queue, filter_expression, attempt_id)) {
        if (!active()) {
          SPDLOG_WARN("Failed to initiate receive message request-response-cycle for {}", simpleNameOf(message_queue));
          // TODO: remove it from current assignment such that a second attempt will be made again in the next round.
        }
      }
    }
  }
}