public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java [113:167]


        public void process(V data) throws Throwable {
            Throwable throwable = errorReference.get();
            if (throwable != null) {
                errorReference.set(null);
                throw throwable;
            }

            K key = this.context.getKey();
            long time = this.context.getDataTime();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
            if (time < watermark) {
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }


            //f(time) -> List<Window>
            List<Window> windows = super.calculateWindow(windowInfo, time);
            for (Window window : windows) {
                logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);

                //f(Window + key, store) -> oldValue
                //todo key 怎么转化成对应的string,只和key的值有关系
                WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());
                WindowState<K, OV> oldState = this.windowStore.get(windowKey);

                //f(oldValue, Agg) -> newValue
                OV oldValue;
                if (oldState == null || oldState.getValue() == null) {
                    oldValue = initAction.get();
                } else {
                    oldValue = oldState.getValue();
                }

                OV newValue = this.aggregateAction.calculate(key, data, oldValue);
                if (newValue != null && newValue.equals(oldValue)) {
                    continue;
                }

                //f(Window + key, newValue, store)
                WindowState<K, OV> state = new WindowState<>(key, newValue, time);
                this.windowStore.put(stateTopicMessageQueue, windowKey, state);
                this.idleWindowScaner.putAggregateWindowCallback(windowKey, watermark, this.aggregateWindowFire);
            }

            try {
                List<WindowKey> fire = this.aggregateWindowFire.fire(name, watermark);
                for (WindowKey windowKey : fire) {
                    this.idleWindowScaner.removeWindowKey(windowKey);
                }
            } catch (Throwable t) {
                errorReference.compareAndSet(null, t);
            }
        }