in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java [60:93]
public Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever) {
Map<TopicPartition, Long> offsets = new HashMap<>();
List<TopicPartition> toLookup = new ArrayList<>();
for (TopicPartition tp : partitions) {
Long offset = initialOffsets.get(tp);
if (offset == null) {
toLookup.add(tp);
} else {
offsets.put(tp, offset);
}
}
if (!toLookup.isEmpty()) {
// First check the committed offsets.
Map<TopicPartition, Long> committedOffsets =
partitionOffsetsRetriever.committedOffsets(toLookup);
offsets.putAll(committedOffsets);
toLookup.removeAll(committedOffsets.keySet());
switch (offsetResetStrategy) {
case EARLIEST:
offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
break;
case LATEST:
offsets.putAll(partitionOffsetsRetriever.endOffsets(toLookup));
break;
default:
throw new IllegalStateException(
"Cannot find initial offsets for partitions: " + toLookup);
}
}
return offsets;
}