core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java [198:214]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    this::watermark);
        }

        @Override
        public void process(V data) throws Throwable {
            K key = this.context.getKey();
            long time = this.context.getDataTime();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
            if (time < watermark) {
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }
            //本地存储里面搜索下
            Pair<Long, Long> newSessionWindowTime = fireIfSessionOut(key, data, time, watermark);

            if (newSessionWindowTime != null) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java [202:219]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    this::watermark);
        }

        @Override
        public void process(V data) throws Throwable {
            K key = this.context.getKey();
            long time = this.context.getDataTime();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
            if (time < watermark) {
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }

            //本地存储里面搜索下
            Pair<Long, Long> newSessionWindowTime = fireIfSessionOut(key, data, time, watermark);

            if (newSessionWindowTime != null) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



