in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java [206:235]
public void process(V data) throws Throwable {
K key = this.context.getKey();
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
//本地存储里面搜索下
Pair<Long, Long> newSessionWindowTime = fireIfSessionOut(key, data, time, watermark);
if (newSessionWindowTime != null) {
OV oldValue = this.initAction.get();
OV newValue = this.aggregateAction.calculate(key, data, oldValue);
WindowState<K, OV> state = new WindowState<>(key, newValue, time);
if (time < state.getRecordEarliestTimestamp()) {
//更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
state.setRecordEarliestTimestamp(time);
}
WindowKey windowKey = new WindowKey(name, super.toHexString(key), newSessionWindowTime.getValue(), newSessionWindowTime.getKey());
logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
Utils.format(newSessionWindowTime.getKey()), Utils.format(newSessionWindowTime.getValue()));
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, this.aggregateSessionWindowFire);
}
}