public List fire()

in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/AccumulatorWindowFire.java [52:95]


    public List<WindowKey> fire(String operatorName, long watermark) {
        List<WindowKey> fired = new ArrayList<>();

        try {
            List<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> pairs = windowStore.searchLessThanWatermark(operatorName, watermark);

            //pairs中最后一个时间最小,应该最先触发
            for (int i = pairs.size() - 1; i >= 0; i--) {
                Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = pairs.get(i);

                WindowKey windowKey = pair.getKey();
                WindowState<K, Accumulator<R, OV>> value = pair.getValue();

                Long windowEnd = windowKey.getWindowEnd();

                Properties header = context.getHeader();
                header.put(Constant.WINDOW_START_TIME, windowKey.getWindowStart());
                header.put(Constant.WINDOW_END_TIME, windowEnd);

                Accumulator<R, OV> rovAccumulator = value.getValue();
                OV data = rovAccumulator.result(header);

                Data<K, OV> result = new Data<>(value.getKey(), data, value.getRecordLastTimestamp(), header);
                Data<K, V> convert = this.convert(result);

                if (logger.isDebugEnabled()) {
                    logger.debug("fire window, windowKey={}, search watermark={}, window: [{} - {}], data to next:[{}]", windowKey,
                            watermark, Utils.format(windowKey.getWindowStart()), Utils.format(windowEnd), convert);
                }

                context.forward(convert);

                //删除状态
                windowStore.deleteByKey(windowKey);

                fired.add(windowKey);
            }

        } catch (Throwable t) {
            String format = String.format("fire window error, watermark:%s, operatorName:%s", watermark, operatorName);
            throw new RStreamsException(format, t);
        }
        return fired;
    }