in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [886:941]
private void createJunctionAux(TreeVertex vertex, Channel baseChannel, Junction junction) {
Channel baseChannelCopy = null;
for (int index = vertex.settledIndices.nextSetBit(0);
index >= 0;
index = vertex.settledIndices.nextSetBit(index + 1)) {
// Beware that, if the base channel is existent (and open), we need to create a copy for any new
// destinations.
if (!this.existingDestinationChannelIndices.get(index) && this.openChannelDescriptors.contains(baseChannel.getDescriptor())) {
if (baseChannelCopy == null) baseChannelCopy = baseChannel.copy();
junction.setTargetChannel(index, baseChannelCopy);
} else {
junction.setTargetChannel(index, baseChannel);
}
}
for (TreeEdge edge : vertex.outEdges) {
// See if there is an already existing channel in place.
Channel newChannel = this.existingChannels.get(edge.channelConversion.getTargetChannelDescriptor());
// Otherwise, create a new channel conversion.
if (newChannel == null) {
// Beware that, if the base channel is existent (and open), we need to create a copy of it for the
// new channel conversions.
if (baseChannelCopy == null) {
baseChannelCopy = this.openChannelDescriptors.contains(baseChannel.getDescriptor()) ?
baseChannel.copy() :
baseChannel;
}
newChannel = edge.channelConversion.convert(
baseChannelCopy,
this.optimizationContext.getConfiguration(),
junction.getOptimizationContexts(),
// Hacky: Inject cardinality for cases where we convert a LoopHeadOperator output.
junction.getOptimizationContexts().size() == 1 ? this.cardinality : null
);
} else {
edge.channelConversion.update(
baseChannel,
newChannel,
junction.getOptimizationContexts(),
// Hacky: Inject cardinality for cases where we convert a LoopHeadOperator output.
junction.getOptimizationContexts().size() == 1 ? this.cardinality : null
);
}
if (baseChannel != newChannel) {
final ExecutionTask producer = newChannel.getProducer();
final ExecutionOperator conversionOperator = producer.getOperator();
conversionOperator.setName(String.format(
"convert %s", junction.getSourceOutput()
));
junction.register(producer);
}
this.createJunctionAux(edge.destination, newChannel, junction);
}
}