in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [300:316]
private void handleSourceQueueChange(Set<MessageQueue> latestSet, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to handle source splits change due to ", t);
}
if (latestSet == null) {
return;
}
final SourceChangeResult sourceChangeResult = getSourceChangeResult(latestSet);
if (sourceChangeResult.isEmpty()) {
log.debug("Skip handle source allocated due to not queue change");
return;
}
context.callAsync(
() -> initializeSourceSplits(sourceChangeResult), this::handleSplitChanges);
}