public RStream apply()

in core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java [124:177]


        public <OUT> RStream<OUT> apply(ValueJoinAction<V1, V2, OUT> joinAction) {
            List<String> temp = new ArrayList<>();
            WindowInfo.JoinStream joinStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, null);
            windowInfo.setJoinStream(joinStream);

            Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
            String jobId = leftStreamPipeline.getJobId();

            String name = OperatorNameMaker.makeName(OperatorNameMaker.JOIN_WINDOW_PREFIX, jobId);
            Supplier<Processor<? super OUT>> supplier = new JoinWindowAggregateSupplier<>(name, windowInfo, joinAction);
            ProcessorNode<OUT> commChild = new ProcessorNode(name, temp, supplier);


            {
                GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftSelectAction);

                WindowInfo leftWindowInfo = this.copy(windowInfo);

                WindowInfo.JoinStream leftStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, StreamType.LEFT_STREAM);
                leftWindowInfo.setJoinStream(leftStream);

                leftGroupedStream.window(leftWindowInfo);

                GraphNode lastNode = leftStreamPipeline.getLastNode();
                temp.add(lastNode.getName());
                commChild.addParent(lastNode);
            }

            {

                GroupedStream<K, V2> rightGroupedStream = JoinedStream.this.rightStream.keyBy(rightSelectAction);

                WindowInfo rightWindowInfo = this.copy(windowInfo);

                WindowInfo.JoinStream leftStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, StreamType.RIGHT_STREAM);
                rightWindowInfo.setJoinStream(leftStream);

                rightGroupedStream.window(rightWindowInfo);

                Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
                String rightJobId = rightStreamPipeline.getJobId();
                if (!Objects.equals(jobId, rightJobId)) {
                    throw new IllegalStateException("left stream and right stream must have same jobId.");
                }

                GraphNode lastNode = rightStreamPipeline.getLastNode();
                temp.add(lastNode.getName());
                commChild.addParent(lastNode);

                lastNode.addChild(commChild);
            }

            return new RStreamImpl<>(leftStreamPipeline, commChild);
        }