private void scanAndFireWindow()

in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java [158:201]


    private void scanAndFireWindow() throws Throwable {
        Iterator<Map.Entry<WindowKey, TimeType>> iterator = this.lastUpdateTime2WindowKey.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<WindowKey, TimeType> next = iterator.next();

            WindowKey windowKey = next.getKey();
            TimeType timeType = next.getValue();

            Type type = timeType.getType();
            long updateTime = timeType.getUpdateTime();

            long idleTime = System.currentTimeMillis() - updateTime;

            switch (type) {
                case AggregateSessionWindow:
                case AccumulatorSessionWindow: {
                    long watermark = timeType.getWatermark() + idleTime;
                    if (watermark > windowKey.getWindowEnd()) {
                        try {
                            doFire(windowKey, type, watermark);
                        } finally {
                            iterator.remove();
                        }
                    }
                    break;
                }
                case AccumulatorWindow:
                case JoinWindow:
                case AggregateWindow: {
                    long watermark = timeType.getWatermark() + idleTime;
                    if (idleTime > this.maxIdleTime && watermark > windowKey.getWindowEnd()) {
                        try {
                            doFire(windowKey, type, watermark);
                        } finally {
                            iterator.remove();
                        }
                    }
                    break;
                }
                default:
                    throw new UnsupportedOperationException("unknown window type: " + type);
            }
        }
    }