in core/src/main/java/org/apache/rocketmq/streams/core/rstream/JoinedStream.java [68:106]
public <OUT> RStream<OUT> apply(ValueJoinAction<V1, V2, OUT> joinAction) {
List<String> temp = new ArrayList<>();
Pipeline leftStreamPipeline = JoinedStream.this.leftStream.getPipeline();
String jobId = leftStreamPipeline.getJobId();
String name = OperatorNameMaker.makeName(OperatorNameMaker.JOIN_PREFIX, jobId);
Supplier<Processor<? super OUT>> supplier = new JoinAggregateSupplier<>(name, joinType, joinAction);
ProcessorNode<OUT> commChild = new ProcessorNode(name, temp, supplier);
{
GroupedStream<K, V1> leftGroupedStream = JoinedStream.this.leftStream.keyBy(leftSelectAction);
String addTagName = OperatorNameMaker.makeName(ADD_TAG, jobId);
leftGroupedStream.addGraphNode(addTagName, new AddTagSupplier<>(() -> StreamType.LEFT_STREAM));
GraphNode lastNode = leftStreamPipeline.getLastNode();
temp.add(lastNode.getName());
commChild.addParent(lastNode);
}
Pipeline rightStreamPipeline = JoinedStream.this.rightStream.getPipeline();
String rightJobId = rightStreamPipeline.getJobId();
if (!Objects.equals(jobId, rightJobId)) {
throw new IllegalStateException("left stream and right stream must have same jobId.");
}
{
GroupedStream<K, V2> rightGroupedStream = JoinedStream.this.rightStream.keyBy(rightSelectAction);
String addTagName = OperatorNameMaker.makeName(ADD_TAG, jobId);
rightGroupedStream.addGraphNode(addTagName, new AddTagSupplier<>(()-> StreamType.RIGHT_STREAM));
GraphNode lastNode = rightStreamPipeline.getLastNode();
temp.add(lastNode.getName());
commChild.addParent(lastNode);
lastNode.addChild(commChild);
}
return new RStreamImpl<>(leftStreamPipeline, commChild);
}