in server/src/main/java/org/apache/cassandra/sidecar/coordination/InnerDcTokenAdjacentPeerProvider.java [89:155]
public Set<SidecarInstance> get()
{
Metadata metadata;
try
{
metadata = instanceFetcher.callOnFirstAvailableInstance(instance -> instance.delegate().metadata());
}
catch (Throwable cause)
{
LOGGER.debug("Unable to retrieve metadata", cause);
return Set.of();
}
List<KeyspaceMetadata> keyspaces = metadata.getKeyspaces()
.stream()
// TODO: this should be from configured
.filter(ks -> !DEFAULT_FORBIDDEN_KEYSPACES.contains(ks.getName()))
.collect(Collectors.toList());
if (keyspaces.isEmpty())
{
LOGGER.warn("No user keyspaces found");
return Set.of();
}
Set<Host> localHosts = cassandraClientTokenRingProvider.localInstances();
String localDc = Objects.requireNonNull(localHosts, "CachedLocalTokenRanges not initialized")
.stream()
.map(Host::getDatacenter)
.filter(Objects::nonNull)
.findAny()
.orElseThrow(() -> new RuntimeException("No local instances found."));
Optional<KeyspaceMetadata> maxRfKeyspace = keyspaces.stream()
.filter(ks -> ks.getReplication().containsKey(localDc))
.max(Comparator.comparingInt(a -> Integer.parseInt(a.getReplication().get(localDc))));
if (maxRfKeyspace.isEmpty())
{
LOGGER.info("No keyspace found replicated in DC dc={}", localDc);
return Set.of();
}
int rf = Integer.parseInt(maxRfKeyspace.get().getReplication().get(localDc));
int quorum = rf / 2;
List<Pair<Host, BigInteger>> sortedLocalDcHosts = Objects.requireNonNull(cassandraClientTokenRingProvider.allInstances(),
"CachedLocalTokenRanges not initialized").stream()
.filter(host -> host.getDatacenter().equals(localDc))
.map(host -> Pair.of(host, minToken(host)))
.sorted(Comparator.comparing(Pair::getRight))
.collect(Collectors.toList());
BigInteger localMinToken = minToken(localHosts);
return adjacentHosts(driverUtils, localHosts::contains, localMinToken, sortedLocalDcHosts, quorum)
.stream()
.map(host -> driverUtils.getSocketAddress(host).getAddress().getHostAddress())
.map(sidecarIpAddress -> {
String sidecarHostname = sidecarIpAddress;
try
{
sidecarHostname = dnsResolver.reverseResolve(sidecarIpAddress);
}
catch (UnknownHostException unknownHostException)
{
LOGGER.warn("Unable to reverse resolve hostname for {}", sidecarHostname, unknownHostException);
}
return new SidecarInstanceImpl(sidecarHostname, sidecarServicePort(sidecarHostname));
})
.collect(Collectors.toSet());
}