in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [360:386]
public Map<MessageQueue, Long> committedOffsets(Collection<MessageQueue> messageQueues) {
Map<MessageQueue, Long> result = new ConcurrentHashMap<>();
CompletableFuture.allOf(
messageQueues.stream()
.map(
messageQueue ->
CompletableFuture.supplyAsync(
() ->
innerConsumer
.seekCommittedOffset(
messageQueue))
.thenAccept(
future -> {
try {
result.put(
messageQueue,
future.get());
} catch (Exception e) {
LOG.error(
"Consumer offsets retriever fetch committed offset error",
e);
}
}))
.toArray(CompletableFuture[]::new))
.join();
return result;
}