in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [607:767]
public Map<Bitmask, Tree> enumerate(
Set<ChannelDescriptor> visitedChannelDescriptors,
ChannelDescriptor channelDescriptor,
Bitmask settledDestinationIndices,
boolean isVisitedBreakpointChannel) {
// Mapping from settled indices to the cheapest tree settling them. Will be the return value.
Map<Bitmask, Tree> newSolutions = new HashMap<>(16);
Tree newSolution;
// Check if current path is a (new) solution.
// Exclude existing destinations that are not reached via the current path.
final Bitmask excludedExistingIndices = this.existingDestinationChannelIndices.andNot(
this.reachableExistingDestinationChannelIndices.getOrDefault(channelDescriptor, Bitmask.EMPTY_BITMASK)
);
// Exclude missing destinations if the current channel exists but is not open.
final Bitmask excludedAbsentIndices =
(this.existingChannels.containsKey(channelDescriptor) && !openChannelDescriptors.contains(channelDescriptor)) ?
this.absentDestinationChannelIndices :
Bitmask.EMPTY_BITMASK;
final Bitmask newSettledIndices = this.kernelDestChannelDescriptorsToIndices
.getOrDefault(channelDescriptor, Bitmask.EMPTY_BITMASK)
.andNot(settledDestinationIndices)
.andNotInPlace(excludedExistingIndices)
.andNotInPlace(excludedAbsentIndices);
if (!newSettledIndices.isEmpty()) {
if (channelDescriptor.isReusable() || newSettledIndices.cardinality() == 1) {
// If the channel is reusable, it is almost safe to say that we either use it for all possible
// destinations or for none, because no extra costs can incur.
// TODO: Create all possible combinations if required.
// The same is for when there is only a single destination reached.
newSolution = Tree.singleton(channelDescriptor, newSettledIndices);
newSolutions.put(newSolution.settledDestinationIndices, newSolution);
} else {
// Otherwise, create an entry for each settled index.
for (int index = newSettledIndices.nextSetBit(0); index != -1; index = newSettledIndices.nextSetBit(index + 1)) {
Bitmask newSettledIndicesSubset = new Bitmask(index + 1);
newSettledIndicesSubset.set(index);
newSolution = Tree.singleton(channelDescriptor, newSettledIndicesSubset);
newSolutions.put(newSolution.settledDestinationIndices, newSolution);
}
}
// Check early stopping criteria:
// There are no more channels that could be settled.
if (newSettledIndices.cardinality() == this.destChannelDescriptorSets.size() - excludedExistingIndices.cardinality()
&& (!this.isRequestBreakpoint || isVisitedBreakpointChannel)) {
return newSolutions;
}
}
// For each outgoing edge, explore all combinations of reachable target indices.
if (channelDescriptor.isReusable()) {
// When descending, "pick" the newly settled destinations only for reusable ChannelDescriptors.
settledDestinationIndices.orInPlace(newSettledIndices);
}
final List<ChannelConversion> channelConversions =
ChannelConversionGraph.this.conversions.getOrDefault(channelDescriptor, Collections.emptyList());
final List<Collection<Tree>> childSolutionSets = new ArrayList<>(channelConversions.size());
final Set<ChannelDescriptor> successorChannelDescriptors = this.getSuccessorChannelDescriptors(channelDescriptor);
for (ChannelConversion channelConversion : channelConversions) {
final ChannelDescriptor targetChannelDescriptor = channelConversion.getTargetChannelDescriptor();
// Skip if there are successor channel descriptors that do not contain the target channel descriptor.
if (successorChannelDescriptors != null && !successorChannelDescriptors.contains(targetChannelDescriptor)) {
continue;
}
// Check if the channelConversion can be filtered.
if (successorChannelDescriptors == null && this.isFiltered(channelConversion)) {
logger.info("Filtering conversion {} between {} and {}.", channelConversion, this.sourceOutput, this.destInputs);
continue;
}
if (visitedChannelDescriptors.add(targetChannelDescriptor)) {
final Map<Bitmask, Tree> childSolutions = this.enumerate(
visitedChannelDescriptors,
targetChannelDescriptor,
settledDestinationIndices,
isVisitedBreakpointChannel || targetChannelDescriptor.isSuitableForBreakpoint()
);
childSolutions.values().forEach(
tree -> tree.reroot(
channelDescriptor,
channelDescriptor.isReusable() ? newSettledIndices : Bitmask.EMPTY_BITMASK,
channelConversion,
this.getCostEstimate(channelConversion)
)
);
if (!childSolutions.isEmpty()) childSolutionSets.add(childSolutions.values());
visitedChannelDescriptors.remove(targetChannelDescriptor);
}
}
settledDestinationIndices.andNotInPlace(newSettledIndices);
// Merge the childSolutionSets into the newSolutions.
// Each childSolutionSet corresponds to a traversed outgoing ChannelConversion.
// If a breakpoint is requested, we don't need to merge if there are already comprehensive solutions with
// breakpoints.
if (this.isRequestBreakpoint && !isVisitedBreakpointChannel) {
Tree bestBreakpointSolution = null;
for (Collection<Tree> trees : childSolutionSets) {
for (Tree tree : trees) {
if (this.allDestinationChannelIndices.isSubmaskOf(tree.settledDestinationIndices)) {
bestBreakpointSolution = selectPreferredTree(bestBreakpointSolution, tree);
}
}
}
if (bestBreakpointSolution != null) {
newSolutions.put(bestBreakpointSolution.settledDestinationIndices, bestBreakpointSolution);
return newSolutions;
}
}
// At first, consider the childSolutionSet for each outgoing ChannelConversion individually.
for (Collection<Tree> childSolutionSet : childSolutionSets) {
// Each childSolutionSet its has a mapping from settled indices to trees.
for (Tree tree : childSolutionSet) {
// Update newSolutions if the current tree is cheaper or settling new indices.
newSolutions.merge(tree.settledDestinationIndices, tree, ChannelConversionGraph.this::selectPreferredTree);
}
}
// If the current Channel/vertex is reusable, also detect valid combinations.
// Check if the combinations yield new solutions.
if (channelDescriptor.isReusable()
&& this.kernelDestChannelDescriptorSetsToIndices.size() > 1
&& childSolutionSets.size() > 1
&& this.destInputs.size() > newSettledIndices.cardinality() + settledDestinationIndices.cardinality() + 1) {
// Determine the number of "unreached" destChannelDescriptorSets.
int numUnreachedDestinationSets = 0;
for (Bitmask settlableDestinationIndices : this.kernelDestChannelDescriptorSetsToIndices.values()) {
if (!settlableDestinationIndices.isSubmaskOf(settledDestinationIndices)) {
numUnreachedDestinationSets++;
}
}
if (numUnreachedDestinationSets >= 2) { // only combine when there is more than one destination left
final Collection<List<Collection<Tree>>> childSolutionSetCombinations =
WayangCollections.createPowerList(childSolutionSets, numUnreachedDestinationSets);
for (List<Collection<Tree>> childSolutionSetCombination : childSolutionSetCombinations) {
if (childSolutionSetCombination.size() < 2)
continue; // only combine when we have more than on child solution
for (List<Tree> solutionCombination : WayangCollections.streamedCrossProduct(childSolutionSetCombination)) {
final Tree tree = ChannelConversionGraph.this.mergeTrees(solutionCombination);
if (tree != null) {
newSolutions.merge(tree.settledDestinationIndices, tree, ChannelConversionGraph.this::selectPreferredTree);
}
}
}
}
}
return newSolutions;
}