public Map minOffsets()

in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [389:415]


        public Map<MessageQueue, Long> minOffsets(Collection<MessageQueue> messageQueues) {
            Map<MessageQueue, Long> result = new ConcurrentHashMap<>();
            CompletableFuture.allOf(
                            messageQueues.stream()
                                    .map(
                                            messageQueue ->
                                                    CompletableFuture.supplyAsync(
                                                                    () ->
                                                                            innerConsumer
                                                                                    .seekMinOffset(
                                                                                            messageQueue))
                                                            .thenAccept(
                                                                    future -> {
                                                                        try {
                                                                            result.put(
                                                                                    messageQueue,
                                                                                    future.get());
                                                                        } catch (Exception e) {
                                                                            LOG.error(
                                                                                    "Consumer offsets retriever fetch min offset error",
                                                                                    e);
                                                                        }
                                                                    }))
                                    .toArray(CompletableFuture[]::new))
                    .join();
            return result;
        }