private void submitPullTask()

in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java [148:184]


    private void submitPullTask(String topic, String tag, Set<MessageQueue> assignedQueues) {
        Set<MessageQueue> runningQueues = runningQueueMap.keySet();
        for (MessageQueue runningQueue : runningQueues) {
            if (runningQueue == null || !assignedQueues.contains(runningQueue)) {
                ProcessQueue processQueue = runningQueueMap.remove(runningQueue);
                if (processQueue != null) {
                    processQueue.setDropped(true);
                }
            }
        }
        if (CollectionUtils.isEmpty(assignedQueues)) {
            log.warn("Not found any messageQueue, topic:{}", topic);
            return;
        }

        for (MessageQueue messageQueue : assignedQueues) {
            ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                    .getProcessQueueTable().get(messageQueue);
            if (runningQueueMap.putIfAbsent(messageQueue, processQueue) == null) {
                try {
                    PullTask pullTask = new PullTask(messageQueue, tag);
                    pullImmediately(pullTask);
                    log.info("Submit pullTask:{}", messageQueue);
                } catch (Exception e) {
                    log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e);
                    // 添加pull失败,等待下次 rebalance
                    processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                            .getProcessQueueTable().remove(messageQueue);
                    if (processQueue != null) {
                        processQueue.setDropped(true);
                    }
                    runningQueueMap.remove(messageQueue);
                }
            }
        }

    }