in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java [64:113]
long nextPullOffset(MessageQueue remoteQueue) {
final AtomicReference<RuntimeException> outerException = new AtomicReference<>();
final Long existsOffset = pullOffsetTable.computeIfAbsent(remoteQueue, messageQueue -> {
try {
// Got -1 when MQBrokerException occurred, aka broker returns non-0 code, e.g. ResponseCode.QUERY_NOT_FOUND
// or any runtime exception thrown from rpc hook.
// Got -2 when other exception occurred, include broker not exists, network or client exception
long offset = rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false);
if (offset == -2) {
outerException.set(new RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq client for more details"));
return null;
}
if (offset == -1 || offset == 0) {
// Follow the CONSUME_FROM_WHERE to compute next pull offset
// But note that if broker thrown any unexpected runtime exception may cause offset rollback.
// We don't handle this risk because of MetaQ doesn't have any rpc hook
final ConsumeFromWhere fromWhere = clientConfig.getConsumeFromWhere();
switch (fromWhere) {
case CONSUME_FROM_LAST_OFFSET:
offset = this.rocketmqPullConsumer.maxOffset(remoteQueue);
break;
case CONSUME_FROM_FIRST_OFFSET:
offset = this.rocketmqPullConsumer.minOffset(remoteQueue);
break;
case CONSUME_FROM_TIMESTAMP:
offset = this.rocketmqPullConsumer.searchOffset(remoteQueue, clientConfig.getConsumeTimestamp());
break;
}
}
if (offset >= 0) {
// Got an offset from offset store
return offset;
}
// Maybe wrong ConsumeFromWhere configured, so couldn't find any offset
return null;
} catch (Exception e) {
outerException.set(new RuntimeException(e));
return null;
}
});
if (outerException.get() != null) {
throw outerException.get();
}
if (existsOffset == null) {
final String errorMsg = "[BUG] No offset found in offset store or pullOffsetTable without any exception";
log.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
return existsOffset;
}