in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java [158:201]
private void scanAndFireWindow() throws Throwable {
Iterator<Map.Entry<WindowKey, TimeType>> iterator = this.lastUpdateTime2WindowKey.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<WindowKey, TimeType> next = iterator.next();
WindowKey windowKey = next.getKey();
TimeType timeType = next.getValue();
Type type = timeType.getType();
long updateTime = timeType.getUpdateTime();
long idleTime = System.currentTimeMillis() - updateTime;
switch (type) {
case AggregateSessionWindow:
case AccumulatorSessionWindow: {
long watermark = timeType.getWatermark() + idleTime;
if (watermark > windowKey.getWindowEnd()) {
try {
doFire(windowKey, type, watermark);
} finally {
iterator.remove();
}
}
break;
}
case AccumulatorWindow:
case JoinWindow:
case AggregateWindow: {
long watermark = timeType.getWatermark() + idleTime;
if (idleTime > this.maxIdleTime && watermark > windowKey.getWindowEnd()) {
try {
doFire(windowKey, type, watermark);
} finally {
iterator.remove();
}
}
break;
}
default:
throw new UnsupportedOperationException("unknown window type: " + type);
}
}
}