public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java [113:165]


        public void process(V data) throws Throwable {
            Throwable throwable = errorReference.get();
            if (throwable != null) {
                errorReference.set(null);
                throw throwable;
            }

            K key = this.context.getKey();
            long time = this.context.getDataTime();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
            if (time < watermark) {
                //delay data.
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }

            //f(time) -> List<Window>
            List<Window> windows = super.calculateWindow(windowInfo, time);
            for (Window window : windows) {
                logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);

                //f(Window + key, store) -> oldValue
                //todo key 怎么转化成对应的string,只和key的值有关系
                WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());
                WindowState<K, Accumulator<R, OV>> oldState = this.windowStore.get(windowKey);

                //f(oldValue, Agg) -> newValue
                Accumulator<R, OV> storeAccumulator;
                if (oldState == null || oldState.getValue() == null) {
                    storeAccumulator = accumulator.clone();
                } else {
                    storeAccumulator = oldState.getValue();
                }

                R select = selectAction.select(data);
                storeAccumulator.addValue(select);

                //f(Window + key, newValue, store)
                WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, storeAccumulator, time);
                this.windowStore.put(stateTopicMessageQueue, windowKey, state);
                this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, watermark, this.accumulatorWindowFire);
            }

            try {
                List<WindowKey> fire = this.accumulatorWindowFire.fire(name, watermark);
                for (WindowKey windowKey : fire) {
                    this.idleWindowScaner.removeWindowKey(windowKey);
                }
            } catch (Throwable t) {
                errorReference.compareAndSet(null, t);
            }
        }