in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java [251:320]
public void onRingTopologyChanged(String keyspace, TokenRangeReplicasResponse oldTopology, TokenRangeReplicasResponse newTopology)
{
if (oldTopology == null)
{
LOGGER.debug("Received RingTopologyChanged notification for new topology discovered. " +
"It is already handled inline at findSlicesAndSubmit. Exiting early. " +
"keyspace={}", keyspace);
return;
}
Map<Integer, Set<TokenRange>> localRangesFromOld = RingTopologyRefresher.calculateLocalTokenRanges(instanceMetadataFetcher, oldTopology);
Map<Integer, Set<TokenRange>> localRangesFromNew = RingTopologyRefresher.calculateLocalTokenRanges(instanceMetadataFetcher, newTopology);
if (Objects.equals(localRangesFromOld, localRangesFromNew))
{
LOGGER.debug("Local token ranges derived from both topology are the same. No need to update restore ranges.");
return;
}
// Populate the lostRanges and the gainedRanges
// For each lost range, we want to cancel the RestoreRange that covers it
// For each gained range, we want to create the RestoreRange
Map<Integer, Set<TokenRange>> lostRanges = new HashMap<>(localRangesFromNew.size());
Map<Integer, Set<TokenRange>> gainedRanges = new HashMap<>(localRangesFromNew.size());
for (Integer instanceId : Sets.union(localRangesFromOld.keySet(), localRangesFromNew.keySet()))
{
Set<TokenRange> rangesFromOld = localRangesFromOld.get(instanceId);
Set<TokenRange> rangesFromNew = localRangesFromNew.get(instanceId);
Preconditions.checkState(rangesFromNew != null || rangesFromOld != null,
"Token ranges of instance: " + instanceId + " do not exist in both old and new");
if (rangesFromOld == null) // new node
{
gainedRanges.put(instanceId, rangesFromNew);
}
else if (rangesFromNew == null) // removed node
{
lostRanges.put(instanceId, rangesFromOld);
}
else // both new and old ranges exist and they differs
{
TokenRange.SymmetricDiffResult symmetricDiffResult = TokenRange.symmetricDiff(rangesFromOld, rangesFromNew);
// ranges that are no longer in the new topology are lost
lostRanges.put(instanceId, symmetricDiffResult.onlyInLeft);
// ranges that are new in the new topology are gained
gainedRanges.put(instanceId, symmetricDiffResult.onlyInRight);
}
}
Set<UUID> jobIds = ringTopologyRefresher.allRestoreJobsOfKeyspace(keyspace);
for (UUID jobId : jobIds)
{
RestoreJob restoreJob = restoreJobDatabaseAccessor.find(jobId);
if (restoreJob == null)
{
continue;
}
try
{
// First, discard all the restore ranges that cover the lost ranges
lostRanges.forEach((instanceId, ranges) -> discardLostRanges(restoreJob, instanceId, ranges));
// Next, submit the RestoreRanges from the newly gained ranges
gainedRanges.forEach((instanceId, ranges) -> findSlicesOfCassandraNodeAndSubmit(restoreJob, instanceId, ranges));
}
catch (Exception e)
{
// log the warning and continue to process other jobs
LOGGER.warn("Unexpected exception when adjusting restore job ranges. jobId={}", jobId, e);
}
}
}