in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [611:639]
public void initializeState(FunctionInitializationContext context) throws Exception {
log.info("initialize State ...");
this.unionOffsetStates =
context.getOperatorStateStore()
.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(
new TypeHint<Tuple2<MessageQueue, Long>>() {})));
this.restored = context.isRestored();
if (restored) {
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
if (!restoredOffsets.containsKey(mqOffsets.f0)
|| restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
}
}
log.info(
"Setting restore state in the consumer. Using the following offsets: {}",
restoredOffsets);
} else {
log.info("No restore state for the consumer.");
}
}