public List fire()

in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/AccumulatorSessionWindowFire.java [53:99]


    public List<WindowKey> fire(String operatorName, long watermark) {
        List<WindowKey> fired = new ArrayList<>();

        try {
            List<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> pairs = windowStore.searchLessThanWatermark(operatorName, watermark);

            Iterator<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> iterator = pairs.iterator();
            while (iterator.hasNext()) {
                Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = iterator.next();
                WindowKey windowKey = pair.getKey();
                WindowState<K, Accumulator<R, OV>> state = pair.getValue();

                long windowEnd = windowKey.getWindowEnd();
                long windowBegin;
                if (state.getRecordEarliestTimestamp() == Long.MAX_VALUE) {
                    windowBegin = windowKey.getWindowStart();
                } else {
                    windowBegin = state.getRecordEarliestTimestamp();
                }

                logger.info("fire session,windowKey={}, search keyPrefix={}, window: [{} - {}]",
                        windowKey, state.getKey().toString(), Utils.format(windowBegin), Utils.format(windowEnd));

                Properties header = context.getHeader();
                header.put(Constant.WINDOW_START_TIME, windowBegin);
                header.put(Constant.WINDOW_END_TIME, windowEnd);

                Accumulator<R, OV> value = state.getValue();
                OV data = value.result(header);

                Data<K, OV> result = new Data<>(state.getKey(), data, state.getRecordLastTimestamp(), header);
                Data<K, V> convert = this.convert(result);

                context.forward(convert);

                //删除状态
                windowStore.deleteByKey(windowKey);

                fired.add(windowKey);
            }

            return fired;
        } catch (Throwable t) {
            String format = String.format("fire session window error, name:%s", operatorName);
            throw new RStreamsException(format, t);
        }
    }