public RStream apply()

in core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java [68:106]


        public <OUT> RStream<OUT> apply(ValueJoinAction<V1, V2, OUT> joinAction) {
            List<String> temp = new ArrayList<>();
            Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
            String jobId = leftStreamPipeline.getJobId();

            String name = OperatorNameMaker.makeName(OperatorNameMaker.JOIN_PREFIX, jobId);
            Supplier<Processor<? super OUT>> supplier = new JoinAggregateSupplier<>(name, joinType, joinAction);
            ProcessorNode<OUT> commChild = new ProcessorNode(name, temp, supplier);


            {
                GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftSelectAction);
                String addTagName = OperatorNameMaker.makeName(ADD_TAG, jobId);
                leftGroupedStream.addGraphNode(addTagName, new AddTagSupplier<>(() -> StreamType.LEFT_STREAM));

                GraphNode lastNode = leftStreamPipeline.getLastNode();
                temp.add(lastNode.getName());
                commChild.addParent(lastNode);
            }

            Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
            String rightJobId = rightStreamPipeline.getJobId();
            if (!Objects.equals(jobId, rightJobId)) {
                throw new IllegalStateException("left stream and right stream must have same jobId.");
            }

            {
                GroupedStream<K, V2> rightGroupedStream = JoinedStream.this.rightStream.keyBy(rightSelectAction);
                String addTagName = OperatorNameMaker.makeName(ADD_TAG, jobId);
                rightGroupedStream.addGraphNode(addTagName, new AddTagSupplier<>(()-> StreamType.RIGHT_STREAM));

                GraphNode lastNode = rightStreamPipeline.getLastNode();
                temp.add(lastNode.getName());
                commChild.addParent(lastNode);

                lastNode.addChild(commChild);
            }
            return new RStreamImpl<>(leftStreamPipeline, commChild);
        }