public CompletableFuture seekCommittedOffset()

in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [236:265]


    public CompletableFuture<Long> seekCommittedOffset(MessageQueue messageQueue) {
        return CompletableFuture.supplyAsync(
                () -> {
                    try {
                        long offset =
                                consumer.getOffsetStore()
                                        .readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);

                        if (offset == -1) {
                            offset = adminExt.minOffset(messageQueue);
                            LOG.info(
                                    "Consumer seek committed offset from remote, offset=-1,mq={},use minOffset={}",
                                    UtilAll.getQueueDescription(messageQueue),
                                    offset);
                        }
                        LOG.info(
                                "Consumer seek committed offset from remote, mq={}, offset={}",
                                UtilAll.getQueueDescription(messageQueue),
                                offset);
                        return offset;
                    } catch (Exception e) {
                        LOG.error(
                                "Consumer seek committed offset from remote error, mq={}",
                                UtilAll.getQueueDescription(messageQueue),
                                e);
                        throw new RuntimeException(e);
                    }
                },
                commonExecutorService);
    }