core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java [65:76]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        @Override
        public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowable {
            super.preProcess(context);
            this.stateStore = super.waitStateReplay();

            String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
            this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
        }

        @Override
        public void process(V data) throws Throwable {
            K key = this.context.getKey();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java [66:77]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        @Override
        public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowable {
            super.preProcess(context);
            this.stateStore = super.waitStateReplay();

            String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
            this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
        }

        @Override
        public void process(V data) throws Throwable {
            K key = this.context.getKey();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



