in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java [236:336]
private Pair<Long/*sessionBegin*/, Long/*sessionEnd*/> fireIfSessionOut(K key, V data, long dataTime, long watermark) throws Throwable {
List<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> pairs = this.windowStore.searchMatchKeyPrefix(name);
if (pairs.size() == 0) {
return new Pair<>(dataTime, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
}
logger.debug("exist session state num={}", pairs.size());
//sessionEndTime小的先触发
Iterator<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> iterator = pairs.iterator();
int count = 0;
long lastStateSessionEnd = 0;
long maxFireSessionEnd = Long.MIN_VALUE;
while (iterator.hasNext()) {
Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = iterator.next();
logger.debug("exist session state{}=[{}]", count++, pair);
WindowKey windowKey = pair.getKey();
long sessionEnd = windowKey.getWindowEnd();
if (count == pairs.size()) {
lastStateSessionEnd = sessionEnd;
}
//先触发一遍,触发后从集合中删除
if (sessionEnd < watermark) {
//触发state
List<WindowKey> fire = this.accumulatorSessionWindowFire.fire(name, watermark);
for (WindowKey delete : fire) {
this.idleWindowScaner.removeWindowKey(delete);
}
iterator.remove();
maxFireSessionEnd = Long.max(sessionEnd, maxFireSessionEnd);
}
}
if (dataTime < maxFireSessionEnd) {
logger.warn("late data, discard. key=[{}], data=[{}], dataTime < maxFireSessionEnd: [{}] < [{}]", key, data, dataTime, maxFireSessionEnd);
return null;
}
boolean createNewSessionWindow = false;
WindowKey needToDelete = null;
//再次遍历,找到数据属于某个窗口,如果窗口已经关闭,则只计算新的值,如果窗口没有关闭则计算新值、更新窗口边界、存储状态、删除老值
for (int i = 0; i < pairs.size(); i++) {
Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = pairs.get(i);
WindowKey windowKey = pair.getKey();
WindowState<K, Accumulator<R, OV>> state = pair.getValue();
if (windowKey.getWindowEnd() < dataTime) {
createNewSessionWindow = true;
} else if (windowKey.getWindowStart() <= dataTime) {
logger.debug("data belong to exist session window.dataTime=[{}], window:[{} - {}]", dataTime, Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()));
Accumulator<R, OV> value = state.getValue();
R select = selectAction.select(data);
value.addValue(select);
//更新state
state.setValue(value);
state.setRecordLastTimestamp(dataTime);
if (dataTime < state.getRecordEarliestTimestamp()) {
//更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
state.setRecordEarliestTimestamp(dataTime);
}
//如果是最后一个窗口,更新窗口结束时间
if (i == pairs.size() - 1) {
long mayBeSessionEnd = dataTime + windowInfo.getSessionTimeout().toMilliseconds();
if (windowKey.getWindowEnd() < mayBeSessionEnd) {
logger.debug("update exist session window, before:[{} - {}], after:[{} - {}]", Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()),
Utils.format(windowKey.getWindowStart()), Utils.format(mayBeSessionEnd));
//删除老状态
needToDelete = windowKey;
//需要保存的新状态
windowKey = new WindowKey(windowKey.getOperatorName(), windowKey.getKey2String(), mayBeSessionEnd, windowKey.getWindowStart());
}
}
} else {
logger.warn("discard data: key=[{}], data=[{}], dataTime=[{}], watermark=[{}]", key, data, dataTime, watermark);
}
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, this.accumulatorSessionWindowFire);
this.idleWindowScaner.removeOldAccumulatorSession(needToDelete);
this.windowStore.deleteByKey(needToDelete);
}
if (pairs.size() == 0 || createNewSessionWindow) {
return new Pair<>(lastStateSessionEnd, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
}
return null;
}