protected long watermark()

in core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java [63:84]


    protected long watermark(long watermark, MessageQueue stateTopicMessageQueue) {
        byte[] keyBytes = Utils.watermarkKeyBytes(stateTopicMessageQueue, Constant.WATERMARK_KEY);

        try {
            StateStore stateStore = this.context.getStateStore();

            byte[] watermarkBytes = stateStore.get(keyBytes);
            long oldWatermark = Utils.bytes2Long(watermarkBytes);

            if (watermark > oldWatermark) {
                byte[] newWatermarkBytes = Utils.long2Bytes(watermark);
                stateStore.put(stateTopicMessageQueue, keyBytes, newWatermarkBytes);
            } else {
                watermark = oldWatermark;
            }
        } catch (Throwable t) {
            throw new RStreamsException(t);
        }


        return watermark;
    }