public Map getMessageQueueOffsets()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/offset/OffsetsSelectorBySpecified.java [44:75]


    public Map<MessageQueue, Long> getMessageQueueOffsets(
            Collection<MessageQueue> messageQueues, MessageQueueOffsetsRetriever offsetsRetriever) {
        Map<MessageQueue, Long> offsets = new HashMap<>();
        List<MessageQueue> toLookup = new ArrayList<>();
        for (MessageQueue tp : messageQueues) {
            Long offset = initialOffsets.get(tp);
            if (offset == null) {
                toLookup.add(tp);
            } else {
                offsets.put(tp, offset);
            }
        }
        if (!toLookup.isEmpty()) {
            // First check the committed offsets.
            Map<MessageQueue, Long> committedOffsets = offsetsRetriever.committedOffsets(toLookup);
            offsets.putAll(committedOffsets);
            toLookup.removeAll(committedOffsets.keySet());

            switch (offsetResetStrategy) {
                case EARLIEST:
                    offsets.putAll(offsetsRetriever.minOffsets(toLookup));
                    break;
                case LATEST:
                    offsets.putAll(offsetsRetriever.maxOffsets(toLookup));
                    break;
                default:
                    throw new IllegalStateException(
                            "Cannot find initial offsets for partitions: " + toLookup);
            }
        }
        return offsets;
    }