in src/brpc/policy/nacos_naming_service.cpp [108:233]
int NacosNamingService::GetServerNodes(const char *service_name,
bool token_changed,
std::vector<ServerNode> *nodes) {
if (_nacos_url.empty() || token_changed) {
_nacos_url = FLAGS_nacos_service_discovery_path;
_nacos_url += "?";
if (!_access_token.empty()) {
_nacos_url += "accessToken=" + _access_token;
_nacos_url += "&";
}
_nacos_url += service_name;
}
Controller cntl;
cntl.http_request().uri() = _nacos_url;
_channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << _nacos_url << ": "
<< cntl.ErrorText();
return -1;
}
if (cntl.http_response().status_code() != HTTP_STATUS_OK) {
LOG(ERROR) << "Failed to request nacos, http status code: "
<< cntl.http_response().status_code();
return -1;
}
BUTIL_RAPIDJSON_NAMESPACE::Document doc;
if (doc.Parse(cntl.response_attachment().to_string().c_str())
.HasParseError()) {
LOG(ERROR) << "Failed to parse nacos response";
return -1;
}
if (!doc.IsObject()) {
LOG(ERROR) << "The nacos's response for " << service_name
<< " is not a json object";
return -1;
}
auto it_hosts = doc.FindMember("hosts");
if (it_hosts == doc.MemberEnd()) {
LOG(ERROR) << "The nacos's response for " << service_name
<< " has no hosts member";
return -1;
}
auto &hosts = it_hosts->value;
if (!hosts.IsArray()) {
LOG(ERROR) << "hosts member in nacos response is not an array";
return -1;
}
std::set<ServerNode> presence;
for (auto it = hosts.Begin(); it != hosts.End(); ++it) {
auto &host = *it;
if (!host.IsObject()) {
LOG(ERROR) << "host member in nacos response is not an object";
continue;
}
auto it_ip = host.FindMember("ip");
if (it_ip == host.MemberEnd() || !it_ip->value.IsString()) {
LOG(ERROR) << "host in nacos response has not ip";
continue;
}
auto &ip = it_ip->value;
auto it_port = host.FindMember("port");
if (it_port == host.MemberEnd() || !it_port->value.IsInt()) {
LOG(ERROR) << "host in nacos response has not port";
continue;
}
auto &port = it_port->value;
auto it_enabled = host.FindMember("enabled");
if (it_enabled == host.MemberEnd() || !(it_enabled->value.IsBool()) ||
!(it_enabled->value.GetBool())) {
LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt()
<< " is not enabled";
continue;
}
auto it_healthy = host.FindMember("healthy");
if (it_healthy == host.MemberEnd() || !(it_healthy->value.IsBool()) ||
!(it_healthy->value.GetBool())) {
LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt()
<< " is not healthy";
continue;
}
butil::EndPoint end_point;
if (str2endpoint(ip.GetString(), port.GetUint(), &end_point) != 0) {
LOG(ERROR) << "ncos service with illegal address or port: "
<< ip.GetString() << ":" << port.GetUint();
continue;
}
ServerNode node(end_point);
auto it_weight = host.FindMember("weight");
if (it_weight != host.MemberEnd() && it_weight->value.IsNumber()) {
node.tag =
std::to_string(static_cast<long>(it_weight->value.GetDouble()));
}
presence.insert(node);
}
nodes->reserve(presence.size());
nodes->assign(presence.begin(), presence.end());
if (nodes->empty() && hosts.Size() != 0) {
LOG(ERROR) << "All service about " << service_name
<< " from nacos is invalid, refuse to update servers";
return -1;
}
RPC_VLOG << "Got " << nodes->size()
<< (nodes->size() > 1 ? " servers" : " server") << " from "
<< service_name;
auto it_cache = doc.FindMember("cacheMillis");
if (it_cache != doc.MemberEnd() && it_cache->value.IsInt64()) {
_cache_ms = it_cache->value.GetInt64();
}
return 0;
}