in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [346:425]
private ShortestTreeSearcher(OutputSlot<?> sourceOutput,
Collection<Channel> openChannels,
List<InputSlot<?>> destInputs,
OptimizationContext optimizationContext,
boolean isRequestBreakpoint) {
// Store relevant variables.
this.isRequestBreakpoint = isRequestBreakpoint && openChannels == null; // No breakpoints requestable.
this.optimizationContext = optimizationContext;
this.optimizationContextCopy = new DefaultOptimizationContext(this.optimizationContext);
this.sourceOutput = sourceOutput;
this.destInputs = destInputs;
final boolean isOpenChannelsPresent = openChannels != null && !openChannels.isEmpty();
// Figure out the optimization info via the sourceOutput.
final ExecutionOperator outputOperator = (ExecutionOperator) this.sourceOutput.getOwner();
final OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(outputOperator);
assert operatorContext != null : String.format("Optimization info for %s missing.", outputOperator);
this.cardinality = operatorContext.getOutputCardinality(this.sourceOutput.getIndex());
this.numExecutions = operatorContext.getNumExecutions();
// Figure out, if a part of the conversion is already in place and initialize accordingly.
if (isOpenChannelsPresent) {
// Take any open channel and trace it back to the source channel.
Channel existingChannel = WayangCollections.getAny(openChannels);
while (existingChannel.getProducerSlot() != sourceOutput) {
existingChannel = OptimizationUtils.getPredecessorChannel(existingChannel);
}
final Channel sourceChannel = existingChannel;
this.sourceChannelDescriptor = sourceChannel.getDescriptor();
// Now traverse down-stream to find already reached destinations.
this.existingChannels = new HashMap<>();
this.existingDestinationChannels = new HashMap<>(4);
this.existingDestinationChannelIndices = new Bitmask();
this.collectExistingChannels(sourceChannel);
this.openChannelDescriptors = new HashSet<>(openChannels.size());
for (Channel openChannel : openChannels) {
this.openChannelDescriptors.add(openChannel.getDescriptor());
}
} else {
this.sourceChannelDescriptor = outputOperator.getOutputChannelDescriptor(this.sourceOutput.getIndex());
this.existingChannels = Collections.emptyMap();
this.existingDestinationChannels = Collections.emptyMap();
this.existingDestinationChannelIndices = new Bitmask();
this.openChannelDescriptors = Collections.emptySet();
}
// Set up the destinations.
this.destChannelDescriptorSets = WayangCollections.map(destInputs, this::resolveSupportedChannels);
assert this.destChannelDescriptorSets.stream().noneMatch(Collection::isEmpty);
this.kernelizeChannelRequests();
if (isOpenChannelsPresent) {
// Update the bitmask of all already reached destination channels...
// ...and mark the paths of already reached destination channels via upstream traversal.
this.reachableExistingDestinationChannelIndices = new HashMap<>();
for (Channel existingDestinationChannel : this.existingDestinationChannels.values()) {
final Bitmask channelIndices = this.kernelDestChannelDescriptorsToIndices
.get(existingDestinationChannel.getDescriptor())
.and(this.existingDestinationChannelIndices);
while (true) {
this.reachableExistingDestinationChannelIndices.compute(
existingDestinationChannel.getDescriptor(),
(k, v) -> v == null ? new Bitmask(channelIndices) : v.orInPlace(channelIndices)
);
if (existingDestinationChannel.getDescriptor().equals(this.sourceChannelDescriptor)) break;
existingDestinationChannel = OptimizationUtils.getPredecessorChannel(existingDestinationChannel);
}
}
} else {
this.reachableExistingDestinationChannelIndices = Collections.emptyMap();
}
this.absentDestinationChannelIndices = this.allDestinationChannelIndices
.andNot(this.existingDestinationChannelIndices);
}