public void initializeState()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [624:652]


    public void initializeState(FunctionInitializationContext context) throws Exception {
        log.info("initialize State ...");

        this.unionOffsetStates =
                context.getOperatorStateStore()
                        .getUnionListState(
                                new ListStateDescriptor<>(
                                        OFFSETS_STATE_NAME,
                                        TypeInformation.of(
                                                new TypeHint<Tuple2<MessageQueue, Long>>() {})));
        this.restored = context.isRestored();

        if (restored) {
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                if (!restoredOffsets.containsKey(mqOffsets.f0)
                        || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                }
            }
            log.info(
                    "Setting restore state in the consumer. Using the following offsets: {}",
                    restoredOffsets);
        } else {
            log.info("No restore state for the consumer.");
        }
    }