in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [319:373]
private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult sourceChangeResult) {
lock.lock();
Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet();
OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
Map<MessageQueue, Long> increaseStartingOffsets =
startingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
Map<MessageQueue, Long> increaseStoppingOffsets =
stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
Map<MessageQueue, Long> decreaseStoppingOffsets =
stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);
Map<MessageQueue, Long> decreaseStartingOffsets =
startingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);
Set<RocketMQSourceSplit> increaseSplitSet =
increaseSet.stream()
.map(
mq -> {
long startingOffset = increaseStartingOffsets.get(mq);
long stoppingOffset =
increaseStoppingOffsets.getOrDefault(
mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
return new RocketMQSourceSplit(
mq, startingOffset, stoppingOffset);
})
.collect(Collectors.toSet());
// Update cache
increaseSet.forEach(
mq ->
checkedOffsets.put(
mq,
increaseStartingOffsets.getOrDefault(
mq, RocketMQSourceSplit.NO_STOPPING_OFFSET)));
Set<RocketMQSourceSplit> decreaseSplitSet =
decreaseSet.stream()
.map(
mq -> {
long startingOffset = decreaseStartingOffsets.get(mq);
long stoppingOffset =
decreaseStoppingOffsets.getOrDefault(
mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
allocatedSet.remove(mq);
checkedOffsets.remove(mq);
return new RocketMQSourceSplit(
mq, startingOffset, stoppingOffset, false);
})
.collect(Collectors.toSet());
return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet);
}