in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/AggregateWindowFire.java [51:90]
public List<WindowKey> fire(String operatorName, long watermark) {
List<WindowKey> fired = new ArrayList<>();
try {
List<Pair<WindowKey, WindowState<K, OV>>> pairs = this.windowStore.searchLessThanWatermark(operatorName, watermark);
//pairs中最后一个时间最小,应该最先触发
for (int i = pairs.size() - 1; i >= 0; i--) {
Pair<WindowKey, WindowState<K, OV>> pair = pairs.get(i);
WindowKey windowKey = pair.getKey();
WindowState<K, OV> value = pair.getValue();
Long windowEnd = windowKey.getWindowEnd();
Properties header = this.context.getHeader();
header.put(Constant.WINDOW_START_TIME, windowKey.getWindowStart());
header.put(Constant.WINDOW_END_TIME, windowEnd);
Data<K, OV> result = new Data<>(value.getKey(), value.getValue(), value.getRecordLastTimestamp(), header);
Data<K, V> convert = this.convert(result);
if (logger.isDebugEnabled()) {
logger.debug("fire window, windowKey={}, search watermark={}, window: [{} - {}], data to next:[{}]", windowKey,
watermark, Utils.format(windowKey.getWindowStart()), Utils.format(windowEnd), convert);
}
this.context.forward(convert);
//删除状态
this.windowStore.deleteByKey(windowKey);
fired.add(windowKey);
}
return fired;
} catch (Throwable t) {
String format = String.format("fire window error, watermark:%s, operatorName:%s", watermark, operatorName);
throw new RStreamsException(format, t);
}
}