private Pair fireIfSessionOut()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java [240:335]


        private Pair<Long/*sessionBegin*/, Long/*sessionEnd*/> fireIfSessionOut(K key, V data, long dataTime, long watermark) throws Throwable {
            List<Pair<WindowKey, WindowState<K, OV>>> pairs = this.windowStore.searchMatchKeyPrefix(name);

            if (pairs.size() == 0) {
                return new Pair<>(dataTime, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
            }

            logger.debug("exist session state num={}", pairs.size());

            //sessionEndTime小的先触发
            Iterator<Pair<WindowKey, WindowState<K, OV>>> iterator = pairs.iterator();
            int count = 0;
            long lastStateSessionEnd = 0;
            long maxFireSessionEnd = Long.MIN_VALUE;

            while (iterator.hasNext()) {
                Pair<WindowKey, WindowState<K, OV>> pair = iterator.next();
                logger.debug("exist session state{}=[{}]", count++, pair);

                WindowKey windowKey = pair.getKey();

                long sessionEnd = windowKey.getWindowEnd();
                if (count == pairs.size()) {
                    lastStateSessionEnd = sessionEnd;
                }

                //先触发一遍,触发后从集合中删除
                if (sessionEnd < watermark) {
                    //触发state
                    List<WindowKey> fire = this.aggregateSessionWindowFire.fire(name, watermark);
                    for (WindowKey delete : fire) {
                        this.idleWindowScaner.removeWindowKey(delete);
                    }
                    iterator.remove();
                    maxFireSessionEnd = Long.max(sessionEnd, maxFireSessionEnd);
                }
            }

            if (dataTime < maxFireSessionEnd) {
                logger.warn("late data, discard. key=[{}], data=[{}], dataTime < maxFireSessionEnd: [{}] < [{}]", key, data, dataTime, maxFireSessionEnd);
                return null;
            }

            boolean createNewSessionWindow = false;
            WindowKey needToDelete = null;

            //再次遍历,找到数据属于某个窗口,如果窗口已经关闭,则只计算新的值,如果窗口没有关闭则计算新值、更新窗口边界、存储状态、删除老值
            for (int i = 0; i < pairs.size(); i++) {
                Pair<WindowKey, WindowState<K, OV>> pair = pairs.get(i);

                WindowKey windowKey = pair.getKey();
                WindowState<K, OV> state = pair.getValue();

                if (windowKey.getWindowEnd() < dataTime) {
                    createNewSessionWindow = true;
                } else if (windowKey.getWindowStart() <= dataTime) {
                    logger.debug("data belong to exist session window.dataTime=[{}], window:[{} - {}]", dataTime, Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()));
                    OV newValue = this.aggregateAction.calculate(key, data, state.getValue());

                    //更新state
                    state.setValue(newValue);
                    state.setRecordLastTimestamp(dataTime);
                    if (dataTime < state.getRecordEarliestTimestamp()) {
                        //更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
                        state.setRecordEarliestTimestamp(dataTime);
                    }

                    //如果是最后一个窗口,更新窗口结束时间
                    if (i == pairs.size() - 1) {
                        long mayBeSessionEnd = dataTime + windowInfo.getSessionTimeout().toMilliseconds();
                        if (windowKey.getWindowEnd() < mayBeSessionEnd) {
                            logger.debug("update exist session window, before:[{} - {}], after:[{} - {}]", Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()),
                                    Utils.format(windowKey.getWindowStart()), Utils.format(mayBeSessionEnd));
                            //删除老状态
                            needToDelete = windowKey;
                            //需要保存的新状态
                            windowKey = new WindowKey(windowKey.getOperatorName(), windowKey.getKey2String(), mayBeSessionEnd, windowKey.getWindowStart());
                        }
                    }
                } else {
                    logger.warn("discard data: key=[{}], data=[{}], dataTime=[{}], watermark=[{}]", key, data, dataTime, watermark);
                }

                this.windowStore.put(stateTopicMessageQueue, windowKey, state);

                this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, this.aggregateSessionWindowFire);
                this.idleWindowScaner.removeOldAggregateSession(needToDelete);

                this.windowStore.deleteByKey(needToDelete);
            }

            if (pairs.size() == 0 || createNewSessionWindow) {
                return new Pair<>(lastStateSessionEnd, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
            }
            return null;
        }