in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java [113:165]
public void process(V data) throws Throwable {
Throwable throwable = errorReference.get();
if (throwable != null) {
errorReference.set(null);
throw throwable;
}
K key = this.context.getKey();
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
//delay data.
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
//f(time) -> List<Window>
List<Window> windows = super.calculateWindow(windowInfo, time);
for (Window window : windows) {
logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);
//f(Window + key, store) -> oldValue
//todo key 怎么转化成对应的string,只和key的值有关系
WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());
WindowState<K, Accumulator<R, OV>> oldState = this.windowStore.get(windowKey);
//f(oldValue, Agg) -> newValue
Accumulator<R, OV> storeAccumulator;
if (oldState == null || oldState.getValue() == null) {
storeAccumulator = accumulator.clone();
} else {
storeAccumulator = oldState.getValue();
}
R select = selectAction.select(data);
storeAccumulator.addValue(select);
//f(Window + key, newValue, store)
WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, storeAccumulator, time);
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, watermark, this.accumulatorWindowFire);
}
try {
List<WindowKey> fire = this.accumulatorWindowFire.fire(name, watermark);
for (WindowKey windowKey : fire) {
this.idleWindowScaner.removeWindowKey(windowKey);
}
} catch (Throwable t) {
errorReference.compareAndSet(null, t);
}
}