in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java [535:566]
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
ListConsumerGroupOffsetsOptions options =
new ListConsumerGroupOffsetsOptions()
.topicPartitions(new ArrayList<>(partitions));
try {
return adminClient
.listConsumerGroupOffsets(groupId, options)
.partitionsToOffsetAndMetadata()
.thenApply(
result -> {
Map<TopicPartition, Long> offsets = new HashMap<>();
result.forEach(
(tp, oam) -> {
if (oam != null) {
offsets.put(tp, oam.offset());
}
});
return offsets;
})
.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(
"Interrupted while listing offsets for consumer group " + groupId, e);
} catch (ExecutionException e) {
throw new FlinkRuntimeException(
"Failed to fetch committed offsets for consumer group "
+ groupId
+ " due to",
e);
}
}