private void createJunctionAux()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [886:941]


        private void createJunctionAux(TreeVertex vertex, Channel baseChannel, Junction junction) {
            Channel baseChannelCopy = null;
            for (int index = vertex.settledIndices.nextSetBit(0);
                 index >= 0;
                 index = vertex.settledIndices.nextSetBit(index + 1)) {
                // Beware that, if the base channel is existent (and open), we need to create a copy for any new
                // destinations.
                if (!this.existingDestinationChannelIndices.get(index) && this.openChannelDescriptors.contains(baseChannel.getDescriptor())) {
                    if (baseChannelCopy == null) baseChannelCopy = baseChannel.copy();
                    junction.setTargetChannel(index, baseChannelCopy);
                } else {
                    junction.setTargetChannel(index, baseChannel);
                }
            }

            for (TreeEdge edge : vertex.outEdges) {
                // See if there is an already existing channel in place.
                Channel newChannel = this.existingChannels.get(edge.channelConversion.getTargetChannelDescriptor());

                // Otherwise, create a new channel conversion.
                if (newChannel == null) {
                    // Beware that, if the base channel is existent (and open), we need to create a copy of it for the
                    // new channel conversions.
                    if (baseChannelCopy == null) {
                        baseChannelCopy = this.openChannelDescriptors.contains(baseChannel.getDescriptor()) ?
                                baseChannel.copy() :
                                baseChannel;
                    }
                    newChannel = edge.channelConversion.convert(
                            baseChannelCopy,
                            this.optimizationContext.getConfiguration(),
                            junction.getOptimizationContexts(),
                            // Hacky: Inject cardinality for cases where we convert a LoopHeadOperator output.
                            junction.getOptimizationContexts().size() == 1 ? this.cardinality : null
                    );
                } else {
                    edge.channelConversion.update(
                            baseChannel,
                            newChannel,
                            junction.getOptimizationContexts(),
                            // Hacky: Inject cardinality for cases where we convert a LoopHeadOperator output.
                            junction.getOptimizationContexts().size() == 1 ? this.cardinality : null
                    );
                }
                if (baseChannel != newChannel) {
                    final ExecutionTask producer = newChannel.getProducer();
                    final ExecutionOperator conversionOperator = producer.getOperator();
                    conversionOperator.setName(String.format(
                            "convert %s", junction.getSourceOutput()
                    ));
                    junction.register(producer);
                }

                this.createJunctionAux(edge.destination, newChannel, junction);
            }
        }