public void run()

in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java [215:295]


        public void run() {
            try {
                if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
                    log.warn("RocketmqPullConsumer not running, pullTask exit.");
                    return;
                }
                ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                        .getProcessQueueTable().get(messageQueue);
                if (processQueue == null || processQueue.isDropped()) {
                    log.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
                    return;
                }
                long offset = localMessageCache.nextPullOffset(messageQueue);

                rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue, this.tag, offset, PULL_BATCH_NUM, new PullCallback() {
                    @Override
                    public void onSuccess(PullResult pullResult) {
                        try {
                            if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
                                log.warn("rocketmqPullConsumer not running, pullTask exit.");
                                return;
                            }

                            ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                                    .getProcessQueueTable().get(messageQueue);
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pq != null && !pq.isDropped()) {
                                        pq.putMessage(pullResult.getMsgFoundList());
                                        for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, messageQueue, pq), Long.MAX_VALUE);
                                        }
                                        localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                                        pullImmediately(PullTask.this);
                                    } else {
                                        localMessageCache.removePullOffset(messageQueue);
                                        log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
                                    }
                                    break;
                                case OFFSET_ILLEGAL:
                                    log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
                                                    "pull result is {}, delay {} ms for next pull",
                                            offset, messageQueue, pullResult, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                                    localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                                    pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                                    break;
                                case NO_NEW_MSG:
                                case NO_MATCHED_MSG:
                                    log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
                                    localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                                    pullImmediately(PullTask.this);
                                    break;
                                default:
                                    log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
                                    break;
                            }
                        } catch (Throwable t) {
                            log.error("Exception occurs when process pullResult", t);
                            pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                        }
                    }

                    @Override
                    public void onException(Throwable e) {
                        long delayTimeMillis = 0L;
                        if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
                            delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
                        } else {
                            delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
                        }
                        log.error("Exception happens when pull message process, delay {} ms for message queue {}",
                                delayTimeMillis, messageQueue, e);
                        pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS);
                    }
                });
            } catch (Throwable t) {
                log.error("Error occurs when pull message process, delay {} ms for message queue {}",
                        PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
                pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
            }
        }