public void onRingTopologyChanged()

in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java [251:320]


    public void onRingTopologyChanged(String keyspace, TokenRangeReplicasResponse oldTopology, TokenRangeReplicasResponse newTopology)
    {
        if (oldTopology == null)
        {
            LOGGER.debug("Received RingTopologyChanged notification for new topology discovered. " +
                         "It is already handled inline at findSlicesAndSubmit. Exiting early. " +
                         "keyspace={}", keyspace);
            return;
        }

        Map<Integer, Set<TokenRange>> localRangesFromOld = RingTopologyRefresher.calculateLocalTokenRanges(instanceMetadataFetcher, oldTopology);
        Map<Integer, Set<TokenRange>> localRangesFromNew = RingTopologyRefresher.calculateLocalTokenRanges(instanceMetadataFetcher, newTopology);
        if (Objects.equals(localRangesFromOld, localRangesFromNew))
        {
            LOGGER.debug("Local token ranges derived from both topology are the same. No need to update restore ranges.");
            return;
        }

        // Populate the lostRanges and the gainedRanges
        // For each lost range, we want to cancel the RestoreRange that covers it
        // For each gained range, we want to create the RestoreRange
        Map<Integer, Set<TokenRange>> lostRanges = new HashMap<>(localRangesFromNew.size());
        Map<Integer, Set<TokenRange>> gainedRanges = new HashMap<>(localRangesFromNew.size());
        for (Integer instanceId : Sets.union(localRangesFromOld.keySet(), localRangesFromNew.keySet()))
        {
            Set<TokenRange> rangesFromOld = localRangesFromOld.get(instanceId);
            Set<TokenRange> rangesFromNew = localRangesFromNew.get(instanceId);
            Preconditions.checkState(rangesFromNew != null || rangesFromOld != null,
                                     "Token ranges of instance: " + instanceId + " do not exist in both old and new");
            if (rangesFromOld == null) // new node
            {
                gainedRanges.put(instanceId, rangesFromNew);
            }
            else if (rangesFromNew == null) // removed node
            {
                lostRanges.put(instanceId, rangesFromOld);
            }
            else // both new and old ranges exist and they differs
            {
                TokenRange.SymmetricDiffResult symmetricDiffResult = TokenRange.symmetricDiff(rangesFromOld, rangesFromNew);
                // ranges that are no longer in the new topology are lost
                lostRanges.put(instanceId, symmetricDiffResult.onlyInLeft);
                // ranges that are new in the new topology are gained
                gainedRanges.put(instanceId, symmetricDiffResult.onlyInRight);
            }
        }

        Set<UUID> jobIds = ringTopologyRefresher.allRestoreJobsOfKeyspace(keyspace);
        for (UUID jobId : jobIds)
        {
            RestoreJob restoreJob = restoreJobDatabaseAccessor.find(jobId);
            if (restoreJob == null)
            {
                continue;
            }

            try
            {
                // First, discard all the restore ranges that cover the lost ranges
                lostRanges.forEach((instanceId, ranges) -> discardLostRanges(restoreJob, instanceId, ranges));
                // Next, submit the RestoreRanges from the newly gained ranges
                gainedRanges.forEach((instanceId, ranges) -> findSlicesOfCassandraNodeAndSubmit(restoreJob, instanceId, ranges));
            }
            catch (Exception e)
            {
                // log the warning and continue to process other jobs
                LOGGER.warn("Unexpected exception when adjusting restore job ranges. jobId={}", jobId, e);
            }
        }
    }