public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/MultiValueChangeSupplier.java [50:66]


        public void process(T data) throws Throwable {
            Iterable<? extends VR> convert = valueMapperAction.convert(data);

            if (convert == null) {
                logger.warn("[{}] converts to null, processor returns directly", data);
                return;
            }

            for (VR item : convert) {
                if (item == null) {
                    continue;
                }
                Data<Object, VR> before = new Data<>(this.context.getKey(), item, this.context.getDataTime(), this.context.getHeader());
                Data<Object, T> result = convert(before);
                this.context.forward(result);
            }
        }