in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java [75:100]
public void process(V data) throws Throwable {
K key = this.context.getKey();
Accumulator<R, OV> value;
byte[] keyBytes = super.object2Byte(key);
byte[] valueBytes = stateStore.get(keyBytes);
if (valueBytes == null || valueBytes.length == 0) {
value = accumulator.clone();
} else {
value = super.byte2Object(valueBytes);
}
R select = selectAction.select(data);
value.addValue(select);
OV result = value.result(null);
byte[] newValueBytes = super.object2Byte(value);
stateStore.put(this.stateTopicMessageQueue, keyBytes, newValueBytes);
Data<K, OV> temp = new Data<>(key, result, this.context.getDataTime(), this.context.getHeader());
Data<K, V> convert = super.convert(temp);
this.context.forward(convert);
}