public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java [106:130]


        public void process(Object data) throws Throwable {
            Object key = this.context.getKey();
            long time = this.context.getDataTime();
            Properties header = this.context.getHeader();

            long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);

            if (time < watermark) {
                logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
                return;
            }
            WindowInfo.JoinStream stream = (WindowInfo.JoinStream) header.get(Constant.STREAM_TAG);
            StreamType streamType = stream.getStreamType();
            if (streamType == null) {
                String format = String.format("StreamType is empty, data:%s", data);
                throw new IllegalStateException(format);
            }

            store(key, data, time, watermark, streamType);

            List<WindowKey> fire = this.joinWindowFire.fire(this.name, watermark, streamType);
            for (WindowKey windowKey : fire) {
                this.idleWindowScaner.removeWindowKey(windowKey);
            }
        }