in cpp/source/rocketmq/PushConsumerImpl.cpp [194:230]
bool PushConsumerImpl::selectBroker(const TopicRouteDataPtr& topic_route_data, std::string& broker_host) {
absl::flat_hash_set<std::string> endpoints;
endpointsInUse(endpoints);
if (endpoints.empty()) {
SPDLOG_WARN("No broker is available");
return false;
}
// preference for selecting the access point filled in by the user
if (topic_route_data && !topic_route_data->messageQueues().empty()) {
uint32_t queue_count = topic_route_data->messageQueues().size();
uint32_t index = TopicAssignment::getAndIncreaseQueryWhichBroker();
for (uint32_t i = index; i < index + queue_count; i++) {
auto message_queue = topic_route_data->messageQueues().at(i % queue_count);
if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || !readable(message_queue.permission())) {
continue;
}
std::string current_host = urlOf(message_queue);
if (endpoints.contains(current_host)) {
broker_host = current_host;
return true;
}
}
for (uint32_t i = index; i < index + queue_count; i++) {
auto message_queue = topic_route_data->messageQueues().at(i % queue_count);
if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || !readable(message_queue.permission())) {
continue;
}
broker_host = urlOf(message_queue);
return true;
}
}
return false;
}