in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java [203:253]
private void doFire(WindowKey windowKey, Type type, long watermark) throws Throwable {
String operatorName = windowKey.getOperatorName();
switch (type) {
case AccumulatorWindow: {
AccumulatorWindowFire<?, ?, ?, ?> func = this.fireWindowCallBack.remove(windowKey);
if (func != null) {
//write the result out, delete the state from local and remote
func.fire(operatorName, watermark);
//commit watermark to local and remote.
func.commitWatermark(watermark);
}
break;
}
case AccumulatorSessionWindow: {
AccumulatorSessionWindowFire<?, ?, ?, ?> accumulatorSessionWindowFire = this.fireSessionWindowCallback.remove(windowKey);
if (accumulatorSessionWindowFire != null) {
accumulatorSessionWindowFire.fire(operatorName, watermark);
accumulatorSessionWindowFire.commitWatermark(watermark);
}
break;
}
case AggregateWindow: {
AggregateWindowFire<?, ?, ?> aggregateWindowFire = this.windowKeyAggregate.remove(windowKey);
if (aggregateWindowFire != null) {
aggregateWindowFire.fire(operatorName, watermark);
aggregateWindowFire.commitWatermark(watermark);
}
break;
}
case AggregateSessionWindow: {
AggregateSessionWindowFire<?, ?, ?> sessionWindowFire = this.windowKeyAggregateSession.remove(windowKey);
if (sessionWindowFire != null) {
sessionWindowFire.fire(operatorName, watermark);
sessionWindowFire.commitWatermark(watermark);
}
break;
}
case JoinWindow: {
JoinWindowFire<?, ?, ?, ?> joinWindowFire = this.fireJoinWindowCallback.remove(windowKey);
if (joinWindowFire != null) {
String name = operatorName.substring(0, operatorName.lastIndexOf(Constant.SPLIT));
String streamType = operatorName.substring(operatorName.lastIndexOf(Constant.SPLIT) + 1);
joinWindowFire.fire(name, watermark, StreamType.valueOf(streamType));
joinWindowFire.commitWatermark(watermark);
}
break;
}
}
}