public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java [75:100]


        public void process(V data) throws Throwable {
            K key = this.context.getKey();
            Accumulator<R, OV> value;

            byte[] keyBytes = super.object2Byte(key);

            byte[] valueBytes = stateStore.get(keyBytes);
            if (valueBytes == null || valueBytes.length == 0) {
                value = accumulator.clone();
            } else {
                value = super.byte2Object(valueBytes);
            }

            R select = selectAction.select(data);
            value.addValue(select);

            OV result = value.result(null);
            byte[] newValueBytes = super.object2Byte(value);

            stateStore.put(this.stateTopicMessageQueue, keyBytes, newValueBytes);

            Data<K, OV> temp = new Data<>(key, result, this.context.getDataTime(), this.context.getHeader());
            Data<K, V> convert = super.convert(temp);

            this.context.forward(convert);
        }