in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [447:477]
public Map<MessageQueue, Long> offsetsForTimes(
Map<MessageQueue, Long> messageQueueWithTimeMap) {
Map<MessageQueue, Long> result = new ConcurrentHashMap<>();
CompletableFuture.allOf(
messageQueueWithTimeMap.entrySet().stream()
.map(
entry ->
CompletableFuture.supplyAsync(
() ->
innerConsumer
.seekOffsetByTimestamp(
entry
.getKey(),
entry
.getValue()))
.thenAccept(
future -> {
try {
result.put(
entry.getKey(),
future.get());
} catch (Exception e) {
LOG.error(
"Consumer offsets retriever fetch offset by timestamp error",
e);
}
}))
.toArray(CompletableFuture[]::new))
.join();
return result;
}