in src/main/java/com/uber/rss/metadata/ServiceRegistryUtils.java [55:142]
public static List<ServerDetail> getReachableServers(ServiceRegistry serviceRegistry, int maxServerCount,
long maxTryMillis, String dataCenter, String cluster,
Collection<String> excludeHosts) {
// TODO make following configurable
// get extra servers in case there are bad servers, will remove those extra servers in the final server list
final int extraServerCount = Math.min(5, maxServerCount);
final int serverCandidateCount = maxServerCount + extraServerCount;
final Long CONCURRENT_CONNS = new Long(1);
int retryIntervalMillis = 100;
List<ServerDetail> serverInfos = RetryUtils.retryUntilNotNull(
retryIntervalMillis,
maxTryMillis,
() -> {
try {
logger.info(String.format("Trying to get max %s RSS servers, data center: %s, cluster: %s, " +
"exclude hosts: %s", serverCandidateCount, dataCenter, cluster,
StringUtils.join(excludeHosts, ",")));
return serviceRegistry.getServers(dataCenter, cluster, serverCandidateCount, excludeHosts);
} catch (Throwable ex) {
logger.warn("Failed to call ServiceRegistry.getServers", ex);
return null;
}
});
if (serverInfos == null || serverInfos.isEmpty()) {
throw new RssException("Failed to get all RSS servers");
}
// some hosts may get UnknowHostException sometimes, exclude those hosts
logger.info(String.format("Got %s RSS servers from service registry, checking their connectivity",
serverInfos.size()));
ConcurrentLinkedQueue<String> unreachableHosts = new ConcurrentLinkedQueue<>();
List<ServerCandidate> serverCandidates = serverInfos.parallelStream().map(t -> {
ServerHostAndPort hostAndPort = ServerHostAndPort.fromString(t.getConnectionString());
String host = hostAndPort.getHost();
int port = hostAndPort.getPort();
long startTime = System.currentTimeMillis();
try (BusyStatusSocketClient busyStatusSocketClient = new BusyStatusSocketClient(host, port,
NetworkUtils.DEFAULT_REACHABLE_TIMEOUT, "")) {
GetBusyStatusResponse getBusyStatusResponse = busyStatusSocketClient.getBusyStatus();
long requestLatency = System.currentTimeMillis() - startTime;
return new ServerCandidate(t, requestLatency,getBusyStatusResponse.getMetrics().get(CONCURRENT_CONNS));
} catch (Throwable ex) {
logger.warn(String.format("Detected unreachable host %s", host), ex);
unreachableHosts.add(host);
return null;
}
})
.filter(t -> t != null)
.sorted((o1, o2) -> {
// We wanted to keep 500 ms offset to compare
long offfset = 500;
Long latency1 = o1.getRequestLatency();
Long latency2 = o2.getRequestLatency();
long diff = latency1 - latency2;
if (diff > offfset || diff < (-1 * offfset)) {
int comp = Long.compare(latency1, latency2);
if (comp != 0) {
return comp;
}
}
Long connections1 = o1.getConcurrentConnections();
Long connections2 = o2.getConcurrentConnections();
return Long.compare(connections1,connections2);
})
.collect(Collectors.toList());
for (String unreachableHost : unreachableHosts) {
Map<String, String> tags = new HashMap<>();
tags.put("remote", unreachableHost);
Scope scope = M3Stats.createSubScope(tags);
scope.counter("unreachableHosts").inc(1);
}
serverInfos = serverCandidates.stream().limit(maxServerCount).map(ServerCandidate::getServerDetail).collect(Collectors.toList());
if (serverInfos.size() < serverCandidates.size()) {
for (int i = serverInfos.size(); i < serverCandidates.size(); i++) {
ServerCandidate ignoredServerCandidate = serverCandidates.get(i);
logger.info("Ignore RSS server candidate: {}", ignoredServerCandidate);
}
}
return serverInfos;
}