public void handleSplitsChanges()

in src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java [157:221]


    public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange) {
        // Current not support assign addition splits.
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChange.getClass()));
        }

        if (!initFinish) {
            LOG.info("Start to init reader");
            SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent();
            sourceEvent.setSplits(splitsChange.splits());
            sourceReaderContext.sendSourceEventToCoordinator(sourceEvent);
            initFinish = true;
        }
        lock.lock();

        LOG.info("Receive split change: " + splitsChange.splits().toString());
        // Assignment.
        ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>();

        try {
            // Set up the stopping timestamps.
            splitsChange
                    .splits()
                    .forEach(
                            split -> {
                                if (!split.getIsIncrease()) {
                                    finishSplitAtRecord(
                                            split.getMessageQueue(),
                                            split.getStoppingOffset(),
                                            recordsWithSplitIds);
                                } else {
                                    if (!currentOffsetTable.containsKey(split.getMessageQueue())) {
                                        registerSplits(split);
                                        newOffsetTable.put(
                                                split.getMessageQueue(),
                                                new Tuple2<>(
                                                        split.getStartingOffset(),
                                                        split.getStoppingOffset()));
                                    }
                                }
                            });
        } finally {
            lock.unlock();
        }
        // It will replace the previous assignment
        consumer.assign(currentOffsetTable.keySet());

        // set offset to consumer
        for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) {
            MessageQueue messageQueue = entry.getKey();
            Long startingOffset = entry.getValue().f0;
            try {
                consumer.seek(messageQueue, startingOffset == -1L ? 0L : startingOffset);
            } catch (Exception e) {
                String info =
                        String.format(
                                "messageQueue:%s, seek to starting offset:%s",
                                messageQueue, startingOffset);
                throw new FlinkRuntimeException(info, e);
            }
        }
    }