private void doFire()

in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java [203:253]


    private void doFire(WindowKey windowKey, Type type, long watermark) throws Throwable {
        String operatorName = windowKey.getOperatorName();

        switch (type) {
            case AccumulatorWindow: {
                AccumulatorWindowFire<?, ?, ?, ?> func = this.fireWindowCallBack.remove(windowKey);
                if (func != null) {
                    //write the result out, delete the state from local and remote
                    func.fire(operatorName, watermark);
                    //commit watermark to local and remote.
                    func.commitWatermark(watermark);
                }
                break;
            }
            case AccumulatorSessionWindow: {
                AccumulatorSessionWindowFire<?, ?, ?, ?> accumulatorSessionWindowFire = this.fireSessionWindowCallback.remove(windowKey);
                if (accumulatorSessionWindowFire != null) {
                    accumulatorSessionWindowFire.fire(operatorName, watermark);
                    accumulatorSessionWindowFire.commitWatermark(watermark);
                }
                break;
            }
            case AggregateWindow: {
                AggregateWindowFire<?, ?, ?> aggregateWindowFire = this.windowKeyAggregate.remove(windowKey);
                if (aggregateWindowFire != null) {
                    aggregateWindowFire.fire(operatorName, watermark);
                    aggregateWindowFire.commitWatermark(watermark);
                }
                break;
            }
            case AggregateSessionWindow: {
                AggregateSessionWindowFire<?, ?, ?> sessionWindowFire = this.windowKeyAggregateSession.remove(windowKey);
                if (sessionWindowFire != null) {
                    sessionWindowFire.fire(operatorName, watermark);
                    sessionWindowFire.commitWatermark(watermark);
                }
                break;
            }
            case JoinWindow: {
                JoinWindowFire<?, ?, ?, ?> joinWindowFire = this.fireJoinWindowCallback.remove(windowKey);
                if (joinWindowFire != null) {
                    String name = operatorName.substring(0, operatorName.lastIndexOf(Constant.SPLIT));
                    String streamType = operatorName.substring(operatorName.lastIndexOf(Constant.SPLIT) + 1);

                    joinWindowFire.fire(name, watermark, StreamType.valueOf(streamType));
                    joinWindowFire.commitWatermark(watermark);
                }
                break;
            }
        }
    }