public void handleSplitsChanges()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [138:187]


    public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange) {
        // Current not support assign addition splits.
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChange.getClass()));
        }

        // Assignment.
        ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>();

        // Set up the stopping timestamps.
        splitsChange
                .splits()
                .forEach(
                        split -> {
                            MessageQueue messageQueue =
                                    new MessageQueue(
                                            split.getTopic(),
                                            split.getBrokerName(),
                                            split.getQueueId());
                            newOffsetTable.put(
                                    messageQueue,
                                    new Tuple2<>(
                                            split.getStartingOffset(), split.getStoppingOffset()));
                            rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
                        });

        // todo: log message queue change

        // It will replace the previous assignment
        Set<MessageQueue> incrementalSplits = newOffsetTable.keySet();
        consumer.assign(incrementalSplits);

        // set offset to consumer
        for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) {
            MessageQueue messageQueue = entry.getKey();
            Long startingOffset = entry.getValue().f0;
            try {
                consumer.seek(messageQueue, startingOffset);
            } catch (Exception e) {
                String info =
                        String.format(
                                "messageQueue:%s, seek to starting offset:%s",
                                messageQueue, startingOffset);
                throw new FlinkRuntimeException(info, e);
            }
        }
    }