in core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java [50:76]
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
Set<MessageQueue> ownedQueues = ownedMapping.computeIfAbsent(topic, s -> new HashSet<>());
HashSet<MessageQueue> addQueue = new HashSet<>(mqDivided);
addQueue.removeAll(ownedQueues);
HashSet<MessageQueue> removeQueue = new HashSet<>(ownedQueues);
removeQueue.removeAll(mqDivided);
ownedQueues.addAll(new HashSet<>(addQueue));
ownedQueues.removeAll(new HashSet<>(removeQueue));
//从shuffle topic中读出的数据才能进行有状态计算。
if (topic.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
Throwable throwable = this.recoverHandler.apply(addQueue, removeQueue);
if (throwable != null) {
throw new RuntimeException(throwable);
}
logger.info("recover messageQueue finish, addQueue: [{}], removeQueue:[{}].", addQueue, removeQueue);
}
buildTask(addQueue);
//设计的不太好,移除q,添加消费任务之前,应该加一个状态移除函数;目前这样写的问题是:状态提前移除/加载了,consumer其实仍然在从某个将要移除的q中拉取数据,但是状态却被移除了。
//也不能把originListener.messageQueueChanged放在loadState/removeState之前,那样会已经在拉取数据了,但是状态没有加载好。
originListener.messageQueueChanged(topic, mqAll, mqDivided);
removeTask(removeQueue);
}