in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java [134:169]
public FixedKeysRangeGenerator build() {
List<TopicRange> ranges = new ArrayList<>();
// Calculate the topic ranges.
Integer start = null;
Integer next = null;
for (Integer hash : keyHashes) {
// Start
if (start == null) {
start = hash;
next = hash;
continue;
}
// Continue range.
if (hash - next == 1) {
next = hash;
continue;
}
// Found one continues topic range.
TopicRange range = new TopicRange(start, next);
ranges.add(range);
start = hash;
next = hash;
}
// Support the last range.
if (start != null) {
TopicRange range = new TopicRange(start, next);
ranges.add(range);
}
validateTopicRanges(ranges);
return new FixedKeysRangeGenerator(ranges);
}