in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java [76:98]
public void process(V data) throws Throwable {
K key = this.context.getKey();
OV value;
byte[] keyBytes = super.object2Byte(key);
byte[] valueBytes = stateStore.get(keyBytes);
if (valueBytes == null || valueBytes.length == 0) {
value = initAction.get();
} else {
value = super.byte2Object(valueBytes);
}
OV result = aggregateAction.calculate(key, data, value);
byte[] newValueBytes = super.object2Byte(result);
stateStore.put(this.stateTopicMessageQueue, keyBytes, newValueBytes);
Data<K, OV> temp = new Data<>(key, result, this.context.getDataTime(), this.context.getHeader());
Data<K, V> convert = super.convert(temp);
this.context.forward(convert);
}