in src/brpc/policy/consul_naming_service.cpp [77:210]
int ConsulNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_consul_connected) {
ChannelOptions opt;
opt.protocol = PROTOCOL_HTTP;
opt.connect_timeout_ms = FLAGS_consul_connect_timeout_ms;
opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond;
if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) {
LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr;
return DegradeToOtherServiceIfNeeded(service_name, servers);
}
_consul_connected = true;
}
if (_consul_url.empty()) {
_consul_url.append(FLAGS_consul_service_discovery_url);
_consul_url.append(service_name);
_consul_url.append(FLAGS_consul_url_parameter);
}
servers->clear();
std::string consul_url(_consul_url);
if (!_consul_index.empty()) {
butil::string_appendf(&consul_url, "&index=%s&wait=%ds", _consul_index.c_str(),
FLAGS_consul_blocking_query_wait_secs);
}
Controller cntl;
cntl.http_request().uri() = consul_url;
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << consul_url << ": "
<< cntl.ErrorText();
return DegradeToOtherServiceIfNeeded(service_name, servers);
}
const std::string* index = cntl.http_response().GetHeader(kConsulIndex);
if (index != nullptr) {
if (*index == _consul_index) {
LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
<< service_name
<< ", consul_index: " << _consul_index;
return -1;
}
} else {
LOG(ERROR) << "Failed to parse consul index of " << service_name << ".";
return -1;
}
// Sort/unique the inserted vector is faster, but may have a different order
// of addresses from the file. To make assertions in tests easier, we use
// set to de-duplicate and keep the order.
std::set<ServerNode> presence;
BUTIL_RAPIDJSON_NAMESPACE::Document services;
services.Parse(cntl.response_attachment().to_string().c_str());
if (!services.IsArray()) {
LOG(ERROR) << "The consul's response for "
<< service_name << " is not a json array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < services.Size(); ++i) {
auto itr_service = services[i].FindMember("Service");
if (itr_service == services[i].MemberEnd()) {
LOG(ERROR) << "No service info in node: "
<< RapidjsonValueToString(services[i]);
continue;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& service = itr_service->value;
auto itr_address = service.FindMember("Address");
auto itr_port = service.FindMember("Port");
if (itr_address == service.MemberEnd() ||
!itr_address->value.IsString() ||
itr_port == service.MemberEnd() ||
!itr_port->value.IsUint()) {
LOG(ERROR) << "Service with no valid address or port: "
<< RapidjsonValueToString(service);
continue;
}
butil::EndPoint end_point;
if (str2endpoint(service["Address"].GetString(),
service["Port"].GetUint(),
&end_point) != 0) {
LOG(ERROR) << "Service with illegal address or port: "
<< RapidjsonValueToString(service);
continue;
}
ServerNode node;
node.addr = end_point;
auto itr_tags = service.FindMember("Tags");
if (itr_tags != service.MemberEnd()) {
if (itr_tags->value.IsArray()) {
if (itr_tags->value.Size() > 0) {
// Tags in consul is an array, here we only use the first one.
const BUTIL_RAPIDJSON_NAMESPACE::Value& tag = itr_tags->value[0];
if (tag.IsString()) {
node.tag = tag.GetString();
} else {
LOG(ERROR) << "First tag returned by consul is not string, service: "
<< RapidjsonValueToString(service);
continue;
}
}
} else {
LOG(ERROR) << "Service tags returned by consul is not json array, service: "
<< RapidjsonValueToString(service);
continue;
}
}
if (presence.insert(node).second) {
servers->push_back(node);
} else {
RPC_VLOG << "Duplicated server=" << node;
}
}
_consul_index = *index;
if (servers->empty() && !services.Empty()) {
LOG(ERROR) << "All service about " << service_name
<< " from consul is invalid, refuse to update servers";
return -1;
}
RPC_VLOG << "Got " << servers->size()
<< (servers->size() > 1 ? " servers" : " server")
<< " from " << service_name;
return 0;
}