in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [540:561]
public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues)
throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
if (messageQueues.contains(mq)) {
offsetTable.put(mq, offset);
}
});
List<MessageQueue> extMessageQueue = new ArrayList<>();
for (MessageQueue messageQueue : messageQueues) {
if (!offsetTable.containsKey(messageQueue)) {
extMessageQueue.add(messageQueue);
}
}
if (extMessageQueue.size() != 0) {
log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue);
initOffsets(extMessageQueue);
}
log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
}