private void handleSourceQueueChange()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [300:316]


    private void handleSourceQueueChange(Set<MessageQueue> latestSet, Throwable t) {
        if (t != null) {
            throw new FlinkRuntimeException("Failed to handle source splits change due to ", t);
        }
        if (latestSet == null) {
            return;
        }

        final SourceChangeResult sourceChangeResult = getSourceChangeResult(latestSet);
        if (sourceChangeResult.isEmpty()) {
            log.debug("Skip handle source allocated due to not queue change");
            return;
        }

        context.callAsync(
                () -> initializeSourceSplits(sourceChangeResult), this::handleSplitChanges);
    }