in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java [113:167]
public void process(V data) throws Throwable {
Throwable throwable = errorReference.get();
if (throwable != null) {
errorReference.set(null);
throw throwable;
}
K key = this.context.getKey();
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
//f(time) -> List<Window>
List<Window> windows = super.calculateWindow(windowInfo, time);
for (Window window : windows) {
logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);
//f(Window + key, store) -> oldValue
//todo key 怎么转化成对应的string,只和key的值有关系
WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());
WindowState<K, OV> oldState = this.windowStore.get(windowKey);
//f(oldValue, Agg) -> newValue
OV oldValue;
if (oldState == null || oldState.getValue() == null) {
oldValue = initAction.get();
} else {
oldValue = oldState.getValue();
}
OV newValue = this.aggregateAction.calculate(key, data, oldValue);
if (newValue != null && newValue.equals(oldValue)) {
continue;
}
//f(Window + key, newValue, store)
WindowState<K, OV> state = new WindowState<>(key, newValue, time);
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAggregateWindowCallback(windowKey, watermark, this.aggregateWindowFire);
}
try {
List<WindowKey> fire = this.aggregateWindowFire.fire(name, watermark);
for (WindowKey windowKey : fire) {
this.idleWindowScaner.removeWindowKey(windowKey);
}
} catch (Throwable t) {
errorReference.compareAndSet(null, t);
}
}