in core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java [158:179]
public <K> void forward(Data<K, V> data) throws Throwable {
this.key = data.getKey();
if (data.getTimestamp() != null) {
this.dataTime = data.getTimestamp();
}
this.header = data.getHeader();
List<Processor<V>> store = new ArrayList<>(childList);
for (Processor<V> processor : childList) {
try {
processor.preProcess(this);
processor.process(data.getValue());
} finally {
this.childList.clear();
this.childList.addAll(store);
}
}
}