in server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRangeReplicas.java [327:397]
static TokenRangeReplicas processIntersectingRanges(List<TokenRangeReplicas> output,
Iterator<TokenRangeReplicas> iter,
TokenRangeReplicas current,
TokenRangeReplicas next)
{
// min-heap with a comparator comparing the ends of ranges
PriorityQueue<TokenRangeReplicas> rangeHeap =
new PriorityQueue<>((n1, n2) -> (!n1.end.equals(n2.end())) ?
n1.end().compareTo(n2.end()) : n1.compareTo(n2));
List<TokenRangeReplicas> intersectingRanges = new ArrayList<>();
next = extractIntersectingRanges(intersectingRanges::add, iter, current, next);
rangeHeap.add(intersectingRanges.get(0));
intersectingRanges.stream().skip(1).forEach(r -> {
if (!rangeHeap.isEmpty())
{
TokenRangeReplicas range = rangeHeap.peek();
// Use the last processed range's end as the new range's start
// Except when its the first range, in which case, we use the queue-head's start
Token newStart = output.isEmpty() ? range.start() : output.get(output.size() - 1).end();
if (r.start().compareTo(rangeHeap.peek().end()) == 0)
{
output.add(new TokenRangeReplicas(newStart,
r.start(),
range.partitioner,
getBatchReplicas(rangeHeap)));
rangeHeap.poll();
}
else if (r.start().compareTo(rangeHeap.peek().end()) > 0)
{
output.add(new TokenRangeReplicas(newStart,
range.end(),
range.partitioner,
getBatchReplicas(rangeHeap)));
rangeHeap.poll();
}
// Start-token is before the first intersecting range end. We have not encountered end of the range, so
// it is not removed from the heap yet.
else
{
if (newStart.compareTo(r.start()) != 0)
{
output.add(new TokenRangeReplicas(newStart,
r.start(),
range.partitioner,
getBatchReplicas(rangeHeap)));
}
}
rangeHeap.add(r);
}
});
// Remaining intersecting ranges from heap are processed
while (!rangeHeap.isEmpty())
{
LOGGER.debug("Non-empty heap while resolving intersecting ranges:" + rangeHeap.size());
TokenRangeReplicas nextVal = rangeHeap.peek();
Token newStart = output.isEmpty() ? nextVal.start() : output.get(output.size() - 1).end();
// Corner case w/ common end ranges - we do not add redundant single token range
if (newStart.compareTo(nextVal.end()) != 0)
{
output.add(new TokenRangeReplicas(newStart,
nextVal.end(),
nextVal.partitioner,
getBatchReplicas(rangeHeap)));
}
rangeHeap.poll();
}
return next;
}