private SourceSplitChangeResult initializeSourceSplits()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [319:373]


    private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult sourceChangeResult) {
        lock.lock();

        Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
        Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet();

        OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
                new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);

        Map<MessageQueue, Long> increaseStartingOffsets =
                startingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
        Map<MessageQueue, Long> increaseStoppingOffsets =
                stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
        Map<MessageQueue, Long> decreaseStoppingOffsets =
                stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);
        Map<MessageQueue, Long> decreaseStartingOffsets =
                startingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);

        Set<RocketMQSourceSplit> increaseSplitSet =
                increaseSet.stream()
                        .map(
                                mq -> {
                                    long startingOffset = increaseStartingOffsets.get(mq);
                                    long stoppingOffset =
                                            increaseStoppingOffsets.getOrDefault(
                                                    mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
                                    return new RocketMQSourceSplit(
                                            mq, startingOffset, stoppingOffset);
                                })
                        .collect(Collectors.toSet());
        // Update cache
        increaseSet.forEach(
                mq ->
                        checkedOffsets.put(
                                mq,
                                increaseStartingOffsets.getOrDefault(
                                        mq, RocketMQSourceSplit.NO_STOPPING_OFFSET)));

        Set<RocketMQSourceSplit> decreaseSplitSet =
                decreaseSet.stream()
                        .map(
                                mq -> {
                                    long startingOffset = decreaseStartingOffsets.get(mq);
                                    long stoppingOffset =
                                            decreaseStoppingOffsets.getOrDefault(
                                                    mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
                                    allocatedSet.remove(mq);
                                    checkedOffsets.remove(mq);
                                    return new RocketMQSourceSplit(
                                            mq, startingOffset, stoppingOffset, false);
                                })
                        .collect(Collectors.toSet());

        return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet);
    }