in core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java [63:84]
protected long watermark(long watermark, MessageQueue stateTopicMessageQueue) {
byte[] keyBytes = Utils.watermarkKeyBytes(stateTopicMessageQueue, Constant.WATERMARK_KEY);
try {
StateStore stateStore = this.context.getStateStore();
byte[] watermarkBytes = stateStore.get(keyBytes);
long oldWatermark = Utils.bytes2Long(watermarkBytes);
if (watermark > oldWatermark) {
byte[] newWatermarkBytes = Utils.long2Bytes(watermark);
stateStore.put(stateTopicMessageQueue, keyBytes, newWatermarkBytes);
} else {
watermark = oldWatermark;
}
} catch (Throwable t) {
throw new RStreamsException(t);
}
return watermark;
}