public Map getPartitionOffsets()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java [60:93]


    public Map<TopicPartition, Long> getPartitionOffsets(
            Collection<TopicPartition> partitions,
            PartitionOffsetsRetriever partitionOffsetsRetriever) {
        Map<TopicPartition, Long> offsets = new HashMap<>();
        List<TopicPartition> toLookup = new ArrayList<>();
        for (TopicPartition tp : partitions) {
            Long offset = initialOffsets.get(tp);
            if (offset == null) {
                toLookup.add(tp);
            } else {
                offsets.put(tp, offset);
            }
        }
        if (!toLookup.isEmpty()) {
            // First check the committed offsets.
            Map<TopicPartition, Long> committedOffsets =
                    partitionOffsetsRetriever.committedOffsets(toLookup);
            offsets.putAll(committedOffsets);
            toLookup.removeAll(committedOffsets.keySet());

            switch (offsetResetStrategy) {
                case EARLIEST:
                    offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
                    break;
                case LATEST:
                    offsets.putAll(partitionOffsetsRetriever.endOffsets(toLookup));
                    break;
                default:
                    throw new IllegalStateException(
                            "Cannot find initial offsets for partitions: " + toLookup);
            }
        }
        return offsets;
    }