in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [503:571]
private void kernelizeChannelRequests() {
// Check if the Junction enters a loop "from the side", i.e., across multiple iterations.
// CHECK: Since we rule out non-reusable Channels in #resolveSupportedChannels, do we really need this?
// final LoopSubplan outputLoop = this.sourceOutput.getOwner().getInnermostLoop();
// final int outputLoopDepth = this.sourceOutput.getOwner().getLoopStack().size();
// boolean isSideEnterLoop = this.destInputs.stream().anyMatch(input ->
// !input.getOwner().isLoopHead() &&
// (input.getOwner().getLoopStack().size() > outputLoopDepth ||
// (input.getOwner().getLoopStack().size() == outputLoopDepth && input.getOwner().getInnermostLoop() != outputLoop)
// )
// );
// Index the (unreached) Channel requests by their InputSlots, thereby merging equal ones.
int index = 0;
this.kernelDestChannelDescriptorSetsToIndices = new HashMap<>(this.destChannelDescriptorSets.size());
for (Set<ChannelDescriptor> destChannelDescriptorSet : this.destChannelDescriptorSets) {
final Bitmask indices = this.kernelDestChannelDescriptorSetsToIndices.computeIfAbsent(
destChannelDescriptorSet, key -> new Bitmask(this.destChannelDescriptorSets.size())
);
this.allDestinationChannelIndices.set(index);
indices.set(index++);
}
// Strip off the non-reusable, superfluous ChannelDescriptors where applicable.
Collection<Tuple<Set<ChannelDescriptor>, Bitmask>> kernelDestChannelDescriptorSetsToIndicesUpdates = new LinkedList<>();
final Iterator<Map.Entry<Set<ChannelDescriptor>, Bitmask>> iterator =
this.kernelDestChannelDescriptorSetsToIndices.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Set<ChannelDescriptor>, Bitmask> entry = iterator.next();
final Bitmask indices = entry.getValue();
// Don't touch destination channel sets that occur only once.
if (indices.cardinality() < 2) continue;
// If there is exactly one non-reusable and more than one reusable channel, we can remove the
// non-reusable one.
Set<ChannelDescriptor> channelDescriptors = entry.getKey();
int numReusableChannels = (int) channelDescriptors.stream().filter(ChannelDescriptor::isReusable).count();
if (numReusableChannels == 0 && channelDescriptors.size() == 1) {
logger.warn(
"More than two target operators request only the non-reusable channel {}.",
WayangCollections.getSingle(channelDescriptors)
);
}
if (channelDescriptors.size() - numReusableChannels == 1) {
iterator.remove();
channelDescriptors = new HashSet<>(channelDescriptors);
channelDescriptors.removeIf(channelDescriptor -> !channelDescriptor.isReusable());
kernelDestChannelDescriptorSetsToIndicesUpdates.add(new Tuple<>(channelDescriptors, indices));
}
}
for (Tuple<Set<ChannelDescriptor>, Bitmask> channelsToIndicesChange : kernelDestChannelDescriptorSetsToIndicesUpdates) {
this.kernelDestChannelDescriptorSetsToIndices.computeIfAbsent(
channelsToIndicesChange.getField0(),
key -> new Bitmask(this.destChannelDescriptorSets.size())
).orInPlace(channelsToIndicesChange.getField1());
}
// Index the single ChannelDescriptors.
this.kernelDestChannelDescriptorsToIndices = new HashMap<>();
for (Map.Entry<Set<ChannelDescriptor>, Bitmask> entry : this.kernelDestChannelDescriptorSetsToIndices.entrySet()) {
final Set<ChannelDescriptor> channelDescriptorSet = entry.getKey();
final Bitmask indices = entry.getValue();
for (ChannelDescriptor channelDescriptor : channelDescriptorSet) {
this.kernelDestChannelDescriptorsToIndices.merge(channelDescriptor, new Bitmask(indices), Bitmask::or);
}
}
}