in core/src/main/java/org/apache/rocketmq/streams/core/window/fire/AggregateSessionWindowFire.java [52:95]
public List<WindowKey> fire(String operatorName, long watermark) {
List<WindowKey> fired = new ArrayList<>();
try {
List<Pair<WindowKey, WindowState<K, OV>>> pairs = this.windowStore.searchLessThanWatermark(operatorName, watermark);
Iterator<Pair<WindowKey, WindowState<K, OV>>> iterator = pairs.iterator();
while (iterator.hasNext()) {
Pair<WindowKey, WindowState<K, OV>> pair = iterator.next();
WindowKey windowKey = pair.getKey();
WindowState<K, OV> state = pair.getValue();
long windowEnd = windowKey.getWindowEnd();
long windowBegin;
if (state.getRecordEarliestTimestamp() == Long.MAX_VALUE) {
windowBegin = windowKey.getWindowStart();
} else {
windowBegin = state.getRecordEarliestTimestamp();
}
logger.info("fire session,windowKey={}, search keyPrefix={}, window: [{} - {}]", windowKey,
state.getKey().toString(), Utils.format(windowBegin), Utils.format(windowEnd));
Properties header = this.context.getHeader();
header.put(Constant.WINDOW_START_TIME, windowBegin);
header.put(Constant.WINDOW_END_TIME, windowEnd);
Data<K, OV> result = new Data<>(state.getKey(), state.getValue(), state.getRecordLastTimestamp(), header);
Data<K, V> convert = this.convert(result);
this.context.forward(convert);
//删除状态
this.windowStore.deleteByKey(windowKey);
fired.add(windowKey);
}
return fired;
} catch (Throwable t) {
String format = String.format("fire session window error, name:%s", operatorName);
throw new RStreamsException(format, t);
}
}