in core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java [124:177]
public <OUT> RStream<OUT> apply(ValueJoinAction<V1, V2, OUT> joinAction) {
List<String> temp = new ArrayList<>();
WindowInfo.JoinStream joinStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, null);
windowInfo.setJoinStream(joinStream);
Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
String jobId = leftStreamPipeline.getJobId();
String name = OperatorNameMaker.makeName(OperatorNameMaker.JOIN_WINDOW_PREFIX, jobId);
Supplier<Processor<? super OUT>> supplier = new JoinWindowAggregateSupplier<>(name, windowInfo, joinAction);
ProcessorNode<OUT> commChild = new ProcessorNode(name, temp, supplier);
{
GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftSelectAction);
WindowInfo leftWindowInfo = this.copy(windowInfo);
WindowInfo.JoinStream leftStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, StreamType.LEFT_STREAM);
leftWindowInfo.setJoinStream(leftStream);
leftGroupedStream.window(leftWindowInfo);
GraphNode lastNode = leftStreamPipeline.getLastNode();
temp.add(lastNode.getName());
commChild.addParent(lastNode);
}
{
GroupedStream<K, V2> rightGroupedStream = JoinedStream.this.rightStream.keyBy(rightSelectAction);
WindowInfo rightWindowInfo = this.copy(windowInfo);
WindowInfo.JoinStream leftStream = new WindowInfo.JoinStream(JoinedStream.this.joinType, StreamType.RIGHT_STREAM);
rightWindowInfo.setJoinStream(leftStream);
rightGroupedStream.window(rightWindowInfo);
Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
String rightJobId = rightStreamPipeline.getJobId();
if (!Objects.equals(jobId, rightJobId)) {
throw new IllegalStateException("left stream and right stream must have same jobId.");
}
GraphNode lastNode = rightStreamPipeline.getLastNode();
temp.add(lastNode.getName());
commChild.addParent(lastNode);
lastNode.addChild(commChild);
}
return new RStreamImpl<>(leftStreamPipeline, commChild);
}