in cpp/source/rocketmq/PushConsumerImpl.cpp [284:336]
void PushConsumerImpl::syncProcessQueue(const std::string& topic,
const std::shared_ptr<TopicAssignment>& topic_assignment,
const FilterExpression& filter_expression) {
const std::vector<rmq::Assignment>& assignment_list = topic_assignment->assignmentList();
std::vector<rmq::MessageQueue> message_queue_list;
message_queue_list.reserve(assignment_list.size());
for (const auto& assignment : assignment_list) {
message_queue_list.push_back(assignment.message_queue());
}
std::vector<rmq::MessageQueue> current;
{
absl::MutexLock lock(&process_queue_table_mtx_);
for (auto it = process_queue_table_.begin(); it != process_queue_table_.end();) {
if (topic != it->second->messageQueue().topic().name()) {
it++;
continue;
}
if (std::none_of(
message_queue_list.cbegin(), message_queue_list.cend(),
[&](const rmq::MessageQueue& message_queue) { return it->second->messageQueue() == message_queue; })) {
SPDLOG_INFO("Stop receiving messages from {} as it is not assigned to current client according to latest "
"assignment result from load balancer",
simpleNameOf(it->second->messageQueue()));
process_queue_table_.erase(it++);
} else {
if (!it->second || it->second->expired()) {
SPDLOG_WARN("ProcessQueue={} is expired. Remove it for now.", it->first);
process_queue_table_.erase(it++);
continue;
}
current.push_back(it->second->messageQueue());
it++;
}
}
}
for (const auto& message_queue : message_queue_list) {
if (std::none_of(current.cbegin(), current.cend(),
[&](const rmq::MessageQueue& item) { return item == message_queue; })) {
SPDLOG_DEBUG("Start to receive message from {} according to latest assignment info from load balancer",
simpleNameOf(message_queue));
std::string attempt_id;
if (!receiveMessage(message_queue, filter_expression, attempt_id)) {
if (!active()) {
SPDLOG_WARN("Failed to initiate receive message request-response-cycle for {}", simpleNameOf(message_queue));
// TODO: remove it from current assignment such that a second attempt will be made again in the next round.
}
}
}
}
}