public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java [202:231]


        public void process(V data) throws Throwable {
            K key = this.context.getKey();
            long time = this.context.getDataTime();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
            if (time < watermark) {
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }
            //本地存储里面搜索下
            Pair<Long, Long> newSessionWindowTime = fireIfSessionOut(key, data, time, watermark);

            if (newSessionWindowTime != null) {
                Accumulator<R, OV> temp = accumulator.clone();
                R select = selectAction.select(data);
                temp.addValue(select);

                WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, temp, time);
                if (time < state.getRecordEarliestTimestamp()) {
                    //更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
                    state.setRecordEarliestTimestamp(time);
                }

                WindowKey windowKey = new WindowKey(name, super.toHexString(key), newSessionWindowTime.getValue(), newSessionWindowTime.getKey());
                logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
                        Utils.format(newSessionWindowTime.getKey()), Utils.format(newSessionWindowTime.getValue()));
                this.windowStore.put(stateTopicMessageQueue, windowKey, state);
                this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, this.accumulatorSessionWindowFire);
            }
        }