private void store()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java [133:155]


        private void store(Object key, Object data, long time, long watermark, StreamType streamType) throws Throwable {
            String name = Utils.buildKey(this.name, streamType.name());
            List<Window> windows = super.calculateWindow(windowInfo, time);
            for (Window window : windows) {
                logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);

                WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());

                switch (streamType) {
                    case LEFT_STREAM:
                        WindowState<K, V1> leftState = new WindowState<>((K) key, (V1) data, time);
                        this.leftWindowStore.put(stateTopicMessageQueue, windowKey, leftState);
                        this.idleWindowScaner.putJoinWindowCallback(windowKey, watermark, joinWindowFire);
                        break;
                    case RIGHT_STREAM:
                        WindowState<K, V2> rightState = new WindowState<>((K) key, (V2) data, time);
                        this.rightWindowStore.put(stateTopicMessageQueue, windowKey, rightState);
                        this.idleWindowScaner.putJoinWindowCallback(windowKey, watermark, joinWindowFire);
                        break;
                }
            }

        }