int ConsulNamingService::GetServers()

in src/brpc/policy/consul_naming_service.cpp [77:210]


int ConsulNamingService::GetServers(const char* service_name,
                                    std::vector<ServerNode>* servers) {
    if (!_consul_connected) {
        ChannelOptions opt;
        opt.protocol = PROTOCOL_HTTP;
        opt.connect_timeout_ms = FLAGS_consul_connect_timeout_ms;
        opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond;
        if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) {
            LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr;
            return DegradeToOtherServiceIfNeeded(service_name, servers);
        }
        _consul_connected = true;
    }

    if (_consul_url.empty()) {
        _consul_url.append(FLAGS_consul_service_discovery_url);
        _consul_url.append(service_name);
        _consul_url.append(FLAGS_consul_url_parameter);
    }

    servers->clear();
    std::string consul_url(_consul_url);
    if (!_consul_index.empty()) {
        butil::string_appendf(&consul_url, "&index=%s&wait=%ds", _consul_index.c_str(),
                              FLAGS_consul_blocking_query_wait_secs);
    }

    Controller cntl;
    cntl.http_request().uri() = consul_url;
    _channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
    if (cntl.Failed()) {
        LOG(ERROR) << "Fail to access " << consul_url << ": "
                   << cntl.ErrorText();
        return DegradeToOtherServiceIfNeeded(service_name, servers);
    }

    const std::string* index = cntl.http_response().GetHeader(kConsulIndex);
    if (index != nullptr) {
        if (*index == _consul_index) {
            LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
                                   << service_name
                                   << ", consul_index: " << _consul_index;
            return -1;
        }
    } else {
        LOG(ERROR) << "Failed to parse consul index of " << service_name << ".";
        return -1;
    }

    // Sort/unique the inserted vector is faster, but may have a different order
    // of addresses from the file. To make assertions in tests easier, we use
    // set to de-duplicate and keep the order.
    std::set<ServerNode> presence;

    BUTIL_RAPIDJSON_NAMESPACE::Document services;
    services.Parse(cntl.response_attachment().to_string().c_str());
    if (!services.IsArray()) {
        LOG(ERROR) << "The consul's response for "
                   << service_name << " is not a json array";
        return -1;
    }

    for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < services.Size(); ++i) {
        auto itr_service = services[i].FindMember("Service");
        if (itr_service == services[i].MemberEnd()) {
            LOG(ERROR) << "No service info in node: "
                       << RapidjsonValueToString(services[i]);
            continue;
        }

        const BUTIL_RAPIDJSON_NAMESPACE::Value& service = itr_service->value;
        auto itr_address = service.FindMember("Address");
        auto itr_port = service.FindMember("Port");
        if (itr_address == service.MemberEnd() ||
            !itr_address->value.IsString() ||
            itr_port == service.MemberEnd() ||
            !itr_port->value.IsUint()) {
            LOG(ERROR) << "Service with no valid address or port: "
                       << RapidjsonValueToString(service);
            continue;
        }

        butil::EndPoint end_point;
        if (str2endpoint(service["Address"].GetString(),
                         service["Port"].GetUint(),
                         &end_point) != 0) {
            LOG(ERROR) << "Service with illegal address or port: "
                       << RapidjsonValueToString(service);
            continue;
        }

        ServerNode node;
        node.addr = end_point;
        auto itr_tags = service.FindMember("Tags");
        if (itr_tags != service.MemberEnd()) {
            if (itr_tags->value.IsArray()) {
                if (itr_tags->value.Size() > 0) {
                    // Tags in consul is an array, here we only use the first one.
                    const BUTIL_RAPIDJSON_NAMESPACE::Value& tag = itr_tags->value[0];
                    if (tag.IsString()) {
                        node.tag = tag.GetString();
                    } else {
                        LOG(ERROR) << "First tag returned by consul is not string, service: "
                                   << RapidjsonValueToString(service);
                        continue;
                    }
                }
            } else {
                LOG(ERROR) << "Service tags returned by consul is not json array, service: "
                           << RapidjsonValueToString(service);
                continue;
            }
        }

        if (presence.insert(node).second) {
            servers->push_back(node);
        } else {
            RPC_VLOG << "Duplicated server=" << node;
        }
    }

    _consul_index = *index;

    if (servers->empty() && !services.Empty()) {
        LOG(ERROR) << "All service about " << service_name
                   << " from consul is invalid, refuse to update servers";
        return -1;
    }

    RPC_VLOG << "Got " << servers->size()
             << (servers->size() > 1 ? " servers" : " server")
             << " from " << service_name;
    return 0;
}