in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java [215:295]
public void run() {
try {
if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
log.warn("RocketmqPullConsumer not running, pullTask exit.");
return;
}
ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(messageQueue);
if (processQueue == null || processQueue.isDropped()) {
log.info("ProcessQueue {} dropped, pullTask exit", messageQueue);
return;
}
long offset = localMessageCache.nextPullOffset(messageQueue);
rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue, this.tag, offset, PULL_BATCH_NUM, new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
try {
if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) {
log.warn("rocketmqPullConsumer not running, pullTask exit.");
return;
}
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(messageQueue);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pq != null && !pq.isDropped()) {
pq.putMessage(pullResult.getMsgFoundList());
for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, messageQueue, pq), Long.MAX_VALUE);
}
localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
pullImmediately(PullTask.this);
} else {
localMessageCache.removePullOffset(messageQueue);
log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue);
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " +
"pull result is {}, delay {} ms for next pull",
offset, messageQueue, pullResult, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue);
localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
pullImmediately(PullTask.this);
break;
default:
log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult);
break;
}
} catch (Throwable t) {
log.error("Exception occurs when process pullResult", t);
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
}
@Override
public void onException(Throwable e) {
long delayTimeMillis = 0L;
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
} else {
delayTimeMillis = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
}
log.error("Exception happens when pull message process, delay {} ms for message queue {}",
delayTimeMillis, messageQueue, e);
pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS);
}
});
} catch (Throwable t) {
log.error("Error occurs when pull message process, delay {} ms for message queue {}",
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
}