public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java [76:98]


        public void process(V data) throws Throwable {
            K key = this.context.getKey();
            OV value;

            byte[] keyBytes = super.object2Byte(key);

            byte[] valueBytes = stateStore.get(keyBytes);
            if (valueBytes == null || valueBytes.length == 0) {
                value = initAction.get();
            } else {
                value = super.byte2Object(valueBytes);
            }

            OV result = aggregateAction.calculate(key, data, value);
            byte[] newValueBytes = super.object2Byte(result);

            stateStore.put(this.stateTopicMessageQueue, keyBytes, newValueBytes);

            Data<K, OV> temp = new Data<>(key, result, this.context.getDataTime(), this.context.getHeader());
            Data<K, V> convert = super.convert(temp);

            this.context.forward(convert);
        }