bool PushConsumerImpl::selectBroker()

in cpp/source/rocketmq/PushConsumerImpl.cpp [194:230]


bool PushConsumerImpl::selectBroker(const TopicRouteDataPtr& topic_route_data, std::string& broker_host) {

  absl::flat_hash_set<std::string> endpoints;
  endpointsInUse(endpoints);
  if (endpoints.empty()) {
    SPDLOG_WARN("No broker is available");
    return false;
  }

  // preference for selecting the access point filled in by the user
  if (topic_route_data && !topic_route_data->messageQueues().empty()) {
    uint32_t queue_count = topic_route_data->messageQueues().size();
    uint32_t index = TopicAssignment::getAndIncreaseQueryWhichBroker();
    for (uint32_t i = index; i < index + queue_count; i++) {
      auto message_queue = topic_route_data->messageQueues().at(i % queue_count);
      if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || !readable(message_queue.permission())) {
        continue;
      }

      std::string current_host = urlOf(message_queue);
      if (endpoints.contains(current_host)) {
        broker_host = current_host;
        return true;
      }
    }

    for (uint32_t i = index; i < index + queue_count; i++) {
      auto message_queue = topic_route_data->messageQueues().at(i % queue_count);
      if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || !readable(message_queue.permission())) {
        continue;
      }
      broker_host = urlOf(message_queue);
      return true;
    }
  }
  return false;
}