in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/offset/OffsetsSelectorBySpecified.java [44:75]
public Map<MessageQueue, Long> getMessageQueueOffsets(
Collection<MessageQueue> messageQueues, MessageQueueOffsetsRetriever offsetsRetriever) {
Map<MessageQueue, Long> offsets = new HashMap<>();
List<MessageQueue> toLookup = new ArrayList<>();
for (MessageQueue tp : messageQueues) {
Long offset = initialOffsets.get(tp);
if (offset == null) {
toLookup.add(tp);
} else {
offsets.put(tp, offset);
}
}
if (!toLookup.isEmpty()) {
// First check the committed offsets.
Map<MessageQueue, Long> committedOffsets = offsetsRetriever.committedOffsets(toLookup);
offsets.putAll(committedOffsets);
toLookup.removeAll(committedOffsets.keySet());
switch (offsetResetStrategy) {
case EARLIEST:
offsets.putAll(offsetsRetriever.minOffsets(toLookup));
break;
case LATEST:
offsets.putAll(offsetsRetriever.maxOffsets(toLookup));
break;
default:
throw new IllegalStateException(
"Cannot find initial offsets for partitions: " + toLookup);
}
}
return offsets;
}