public static List getReachableServers()

in src/main/java/com/uber/rss/metadata/ServiceRegistryUtils.java [55:142]


    public static List<ServerDetail> getReachableServers(ServiceRegistry serviceRegistry, int maxServerCount,
                                                         long maxTryMillis, String dataCenter, String cluster,
                                                         Collection<String> excludeHosts) {
        // TODO make following configurable
        // get extra servers in case there are bad servers, will remove those extra servers in the final server list
        final int extraServerCount = Math.min(5, maxServerCount);

        final int serverCandidateCount = maxServerCount + extraServerCount;

        final Long CONCURRENT_CONNS = new Long(1);

        int retryIntervalMillis = 100;
        List<ServerDetail> serverInfos = RetryUtils.retryUntilNotNull(
                retryIntervalMillis,
                maxTryMillis,
                () -> {
                    try {
                        logger.info(String.format("Trying to get max %s RSS servers, data center: %s, cluster: %s, " +
                                "exclude hosts: %s", serverCandidateCount, dataCenter, cluster,
                                StringUtils.join(excludeHosts, ",")));
                        return serviceRegistry.getServers(dataCenter, cluster, serverCandidateCount, excludeHosts);
                    } catch (Throwable ex) {
                        logger.warn("Failed to call ServiceRegistry.getServers", ex);
                        return null;
                    }
                });
        if (serverInfos == null || serverInfos.isEmpty()) {
            throw new RssException("Failed to get all RSS servers");
        }

        // some hosts may get UnknowHostException sometimes, exclude those hosts
        logger.info(String.format("Got %s RSS servers from service registry, checking their connectivity",
                                    serverInfos.size()));
        ConcurrentLinkedQueue<String> unreachableHosts = new ConcurrentLinkedQueue<>();
        List<ServerCandidate> serverCandidates = serverInfos.parallelStream().map(t -> {
          ServerHostAndPort hostAndPort = ServerHostAndPort.fromString(t.getConnectionString());
          String host = hostAndPort.getHost();
          int port = hostAndPort.getPort();
          long startTime = System.currentTimeMillis();
          try (BusyStatusSocketClient busyStatusSocketClient = new BusyStatusSocketClient(host, port,
                                                                    NetworkUtils.DEFAULT_REACHABLE_TIMEOUT, "")) {
            GetBusyStatusResponse getBusyStatusResponse = busyStatusSocketClient.getBusyStatus();
            long requestLatency = System.currentTimeMillis() - startTime;
            return new ServerCandidate(t, requestLatency,getBusyStatusResponse.getMetrics().get(CONCURRENT_CONNS));
          } catch (Throwable ex) {
            logger.warn(String.format("Detected unreachable host %s", host), ex);
            unreachableHosts.add(host);
            return null;
          }
        })
            .filter(t -> t != null)
            .sorted((o1, o2) -> {
                // We wanted to keep 500 ms offset to compare
                long offfset = 500;
                Long latency1 = o1.getRequestLatency();
                Long latency2 = o2.getRequestLatency();

                long diff = latency1 - latency2;
                if (diff > offfset || diff < (-1 * offfset)) {
                    int comp = Long.compare(latency1, latency2);
                    if (comp != 0) {
                        return comp;
                    }
                }
                Long connections1 = o1.getConcurrentConnections();
                Long connections2 = o2.getConcurrentConnections();
                return Long.compare(connections1,connections2);
            })
            .collect(Collectors.toList());

        for (String unreachableHost : unreachableHosts) {
          Map<String, String> tags = new HashMap<>();
          tags.put("remote", unreachableHost);
          Scope scope = M3Stats.createSubScope(tags);
          scope.counter("unreachableHosts").inc(1);
        }

        serverInfos = serverCandidates.stream().limit(maxServerCount).map(ServerCandidate::getServerDetail).collect(Collectors.toList());

        if (serverInfos.size() < serverCandidates.size()) {
          for (int i = serverInfos.size(); i < serverCandidates.size(); i++) {
            ServerCandidate ignoredServerCandidate = serverCandidates.get(i);
            logger.info("Ignore RSS server candidate: {}", ignoredServerCandidate);
          }
        }

        return serverInfos;
    }