core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java [65:79]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        String name = OperatorNameMaker.makeName(COUNT_PREFIX, pipeline.getJobId());

        Supplier<Processor<V>> supplier = new AggregateSupplier<>(name, parent.getName(), () -> 0, (K key, V value, Integer agg) -> agg + 1);

        GraphNode graphNode;
        if (this.parent.shuffleNode()) {
            graphNode = new ShuffleProcessorNode<>(name, parent.getName(), supplier);
        } else {
            graphNode = new ProcessorNode<>(name, parent.getName(), supplier);
        }

        return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
    }

    @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java [81:95]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        String name = OperatorNameMaker.makeName(COUNT_PREFIX, pipeline.getJobId());

        Supplier<Processor<V>> supplier = new AggregateSupplier<>(name, parent.getName(), () -> 0, (K key, V value, Integer agg) -> agg + 1);

        GraphNode graphNode;
        if (this.parent.shuffleNode()) {
            graphNode = new ShuffleProcessorNode<>(name, parent.getName(), supplier);
        } else {
            graphNode = new ProcessorNode<>(name, parent.getName(), supplier);
        }

        return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
    }

    @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



