private void handleSourceCheckEvent()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java [198:239]


    private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) {
        Map<MessageQueue, Tuple2<Long, Long>> checkMap = sourceEvent.getAssignedMq();
        Set<MessageQueue> assignedMq = checkMap.keySet();
        Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet();
        Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq);
        Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq);

        if (increaseSet.isEmpty() && decreaseSet.isEmpty()) {
            LOG.info("No need to checkpoint, current assigned mq is same as before.");
        }

        if (!increaseSet.isEmpty()) {
            SplitsAddition<RocketMQSourceSplit> increase;
            increase =
                    new SplitsAddition<>(
                            increaseSet.stream()
                                    .map(
                                            mq ->
                                                    new RocketMQSourceSplit(
                                                            mq,
                                                            checkMap.get(mq).f0,
                                                            checkMap.get(mq).f1,
                                                            true))
                                    .collect(Collectors.toList()));
            reader.handleSplitsChanges(increase);
        }
        if (!decreaseSet.isEmpty()) {
            SplitsAddition<RocketMQSourceSplit> decrease;
            decrease =
                    new SplitsAddition<>(
                            decreaseSet.stream()
                                    .map(
                                            mq ->
                                                    new RocketMQSourceSplit(
                                                            mq,
                                                            checkMap.get(mq).f0,
                                                            checkMap.get(mq).f1,
                                                            false))
                                    .collect(Collectors.toList()));
            reader.handleSplitsChanges(decrease);
        }
    }