core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java [246:258]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public RStream<V> toRStream() {
        return new RStreamImpl<>(this.pipeline, parent);
    }

    @Override
    public void sink(String topicName, KeyValueSerializer<K, V> serializer) {
        String name = OperatorNameMaker.makeName(SINK_PREFIX, pipeline.getJobId());

        SinkSupplier<K, V> sinkSupplier = new SinkSupplier<>(topicName, serializer);
        GraphNode sinkGraphNode = new SinkGraphNode<>(name, parent.getName(), topicName, sinkSupplier);

        pipeline.addVirtualSink(sinkGraphNode, parent);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/src/main/java/org/apache/rocketmq/streams/core/rstream/WindowStreamImpl.java [227:239]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public RStream<V> toRStream() {
        return new RStreamImpl<>(this.pipeline, parent);
    }

    @Override
    public void sink(String topicName, KeyValueSerializer<K, V> serializer) {
        String name = OperatorNameMaker.makeName(SINK_PREFIX, pipeline.getJobId());

        SinkSupplier<K, V> sinkSupplier = new SinkSupplier<>(topicName, serializer);
        GraphNode sinkGraphNode = new SinkGraphNode<>(name, parent.getName(), topicName, sinkSupplier);

        pipeline.addVirtualSink(sinkGraphNode, parent);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



