in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java [148:184]
private void submitPullTask(String topic, String tag, Set<MessageQueue> assignedQueues) {
Set<MessageQueue> runningQueues = runningQueueMap.keySet();
for (MessageQueue runningQueue : runningQueues) {
if (runningQueue == null || !assignedQueues.contains(runningQueue)) {
ProcessQueue processQueue = runningQueueMap.remove(runningQueue);
if (processQueue != null) {
processQueue.setDropped(true);
}
}
}
if (CollectionUtils.isEmpty(assignedQueues)) {
log.warn("Not found any messageQueue, topic:{}", topic);
return;
}
for (MessageQueue messageQueue : assignedQueues) {
ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(messageQueue);
if (runningQueueMap.putIfAbsent(messageQueue, processQueue) == null) {
try {
PullTask pullTask = new PullTask(messageQueue, tag);
pullImmediately(pullTask);
log.info("Submit pullTask:{}", messageQueue);
} catch (Exception e) {
log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
// 添加pull失败,等待下次 rebalance
processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().remove(messageQueue);
if (processQueue != null) {
processQueue.setDropped(true);
}
runningQueueMap.remove(messageQueue);
}
}
}
}