in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/MultiValueChangeSupplier.java [50:66]
public void process(T data) throws Throwable {
Iterable<? extends VR> convert = valueMapperAction.convert(data);
if (convert == null) {
logger.warn("[{}] converts to null, processor returns directly", data);
return;
}
for (VR item : convert) {
if (item == null) {
continue;
}
Data<Object, VR> before = new Data<>(this.context.getKey(), item, this.context.getDataTime(), this.context.getHeader());
Data<Object, T> result = convert(before);
this.context.forward(result);
}
}