in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java [577:609]
private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets(
Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
try {
return adminClient
.listOffsets(topicPartitionOffsets)
.all()
.thenApply(
result -> {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
offsets = new HashMap<>();
result.forEach(
(tp, listOffsetsResultInfo) -> {
if (listOffsetsResultInfo != null) {
offsets.put(tp, listOffsetsResultInfo);
}
});
return offsets;
})
.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(
"Interrupted while listing offsets for topic partitions: "
+ topicPartitionOffsets,
e);
} catch (ExecutionException e) {
throw new FlinkRuntimeException(
"Failed to list offsets for topic partitions: "
+ topicPartitionOffsets
+ " due to",
e);
}
}