public Map offsetsForTimes()

in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [447:477]


        public Map<MessageQueue, Long> offsetsForTimes(
                Map<MessageQueue, Long> messageQueueWithTimeMap) {
            Map<MessageQueue, Long> result = new ConcurrentHashMap<>();
            CompletableFuture.allOf(
                            messageQueueWithTimeMap.entrySet().stream()
                                    .map(
                                            entry ->
                                                    CompletableFuture.supplyAsync(
                                                                    () ->
                                                                            innerConsumer
                                                                                    .seekOffsetByTimestamp(
                                                                                            entry
                                                                                                    .getKey(),
                                                                                            entry
                                                                                                    .getValue()))
                                                            .thenAccept(
                                                                    future -> {
                                                                        try {
                                                                            result.put(
                                                                                    entry.getKey(),
                                                                                    future.get());
                                                                        } catch (Exception e) {
                                                                            LOG.error(
                                                                                    "Consumer offsets retriever fetch offset by timestamp error",
                                                                                    e);
                                                                        }
                                                                    }))
                                    .toArray(CompletableFuture[]::new))
                    .join();
            return result;
        }