long nextPullOffset()

in adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java [64:113]


    long nextPullOffset(MessageQueue remoteQueue) {
        final AtomicReference<RuntimeException> outerException = new AtomicReference<>();
        final Long existsOffset = pullOffsetTable.computeIfAbsent(remoteQueue, messageQueue -> {
            try {
                // Got -1 when MQBrokerException occurred, aka broker returns non-0 code, e.g. ResponseCode.QUERY_NOT_FOUND
                // or any runtime exception thrown from rpc hook.
                // Got -2 when other exception occurred, include broker not exists, network or client exception
                long offset = rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false);
                if (offset == -2) {
                    outerException.set(new RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq client for more details"));
                    return null;
                }
                if (offset == -1 || offset == 0) {
                    // Follow the CONSUME_FROM_WHERE to compute next pull offset
                    // But note that if broker thrown any unexpected runtime exception may cause offset rollback.
                    // We don't handle this risk because of MetaQ doesn't have any rpc hook
                    final ConsumeFromWhere fromWhere = clientConfig.getConsumeFromWhere();
                    switch (fromWhere) {
                        case CONSUME_FROM_LAST_OFFSET:
                            offset = this.rocketmqPullConsumer.maxOffset(remoteQueue);
                            break;
                        case CONSUME_FROM_FIRST_OFFSET:
                            offset = this.rocketmqPullConsumer.minOffset(remoteQueue);
                            break;
                        case CONSUME_FROM_TIMESTAMP:
                            offset = this.rocketmqPullConsumer.searchOffset(remoteQueue, clientConfig.getConsumeTimestamp());
                            break;
                    }
                }
                if (offset >= 0) {
                    // Got an offset from offset store
                    return offset;
                }
                // Maybe wrong ConsumeFromWhere configured, so couldn't find any offset
                return null;
            } catch (Exception e) {
                outerException.set(new RuntimeException(e));
                return null;
            }
        });
        if (outerException.get() != null) {
            throw outerException.get();
        }
        if (existsOffset == null) {
            final String errorMsg = "[BUG] No offset found in offset store or pullOffsetTable without any exception";
            log.warn(errorMsg);
            throw new RuntimeException(errorMsg);
        }
        return existsOffset;
    }