in server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java [409:456]
TokenRangeReplicasResponse loadOne(String keyspace, Function<String, TokenRangeReplicasResponse> loader)
{
try
{
TokenRangeReplicasResponse topology = loader.apply(keyspace);
RingTopologyChangeContext context = new RingTopologyChangeContext(keyspace, topology);
mapping.compute(keyspace, (key, existing) -> {
context.existing = existing;
if (existing == null)
{
// fulfill promise after retrieving the initial topology
// the promise for the job is made via `promise(RestoreJob)` method
promises.computeIfPresent(keyspace, (k, promise) -> {
promise.tryComplete(topology);
return promise;
});
context.shouldDispatch = true;
return topology;
}
else if (existing.writeReplicas().equals(topology.writeReplicas()))
{
LOGGER.debug("Ring topology of keyspace is unchanged. keyspace={}", keyspace);
return existing;
}
else
{
LOGGER.info("Ring topology of keyspace is changed. keyspace={}", keyspace);
context.shouldDispatch = true;
return topology;
}
});
if (context.shouldDispatch)
{
asyncDispatcher.onRingTopologyChanged(context.keyspace, context.existing, context.current);
}
return topology;
}
catch (Throwable cause)
{
LOGGER.warn("Failure during load topology for keyspace. keyspace={}", keyspace, cause);
promises.computeIfPresent(keyspace, (k, promise) -> {
promise.tryFail(new IllegalStateException("Failed to load topology for keyspace: " + keyspace, cause));
// return null to remove the promise
return null;
});
}
return null;
}