private void kernelizeChannelRequests()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [503:571]


        private void kernelizeChannelRequests() {
            // Check if the Junction enters a loop "from the side", i.e., across multiple iterations.
            // CHECK: Since we rule out non-reusable Channels in #resolveSupportedChannels, do we really need this?
//            final LoopSubplan outputLoop = this.sourceOutput.getOwner().getInnermostLoop();
//            final int outputLoopDepth = this.sourceOutput.getOwner().getLoopStack().size();
//            boolean isSideEnterLoop = this.destInputs.stream().anyMatch(input ->
//                    !input.getOwner().isLoopHead() &&
//                            (input.getOwner().getLoopStack().size() > outputLoopDepth ||
//                                    (input.getOwner().getLoopStack().size() == outputLoopDepth && input.getOwner().getInnermostLoop() != outputLoop)
//                            )
//            );


            // Index the (unreached) Channel requests by their InputSlots, thereby merging equal ones.
            int index = 0;
            this.kernelDestChannelDescriptorSetsToIndices = new HashMap<>(this.destChannelDescriptorSets.size());
            for (Set<ChannelDescriptor> destChannelDescriptorSet : this.destChannelDescriptorSets) {
                final Bitmask indices = this.kernelDestChannelDescriptorSetsToIndices.computeIfAbsent(
                        destChannelDescriptorSet, key -> new Bitmask(this.destChannelDescriptorSets.size())
                );
                this.allDestinationChannelIndices.set(index);
                indices.set(index++);
            }

            // Strip off the non-reusable, superfluous ChannelDescriptors where applicable.
            Collection<Tuple<Set<ChannelDescriptor>, Bitmask>> kernelDestChannelDescriptorSetsToIndicesUpdates = new LinkedList<>();
            final Iterator<Map.Entry<Set<ChannelDescriptor>, Bitmask>> iterator =
                    this.kernelDestChannelDescriptorSetsToIndices.entrySet().iterator();
            while (iterator.hasNext()) {
                final Map.Entry<Set<ChannelDescriptor>, Bitmask> entry = iterator.next();
                final Bitmask indices = entry.getValue();
                // Don't touch destination channel sets that occur only once.
                if (indices.cardinality() < 2) continue;

                // If there is exactly one non-reusable and more than one reusable channel, we can remove the
                // non-reusable one.
                Set<ChannelDescriptor> channelDescriptors = entry.getKey();
                int numReusableChannels = (int) channelDescriptors.stream().filter(ChannelDescriptor::isReusable).count();
                if (numReusableChannels == 0 && channelDescriptors.size() == 1) {
                    logger.warn(
                            "More than two target operators request only the non-reusable channel {}.",
                            WayangCollections.getSingle(channelDescriptors)
                    );
                }
                if (channelDescriptors.size() - numReusableChannels == 1) {
                    iterator.remove();
                    channelDescriptors = new HashSet<>(channelDescriptors);
                    channelDescriptors.removeIf(channelDescriptor -> !channelDescriptor.isReusable());
                    kernelDestChannelDescriptorSetsToIndicesUpdates.add(new Tuple<>(channelDescriptors, indices));
                }
            }
            for (Tuple<Set<ChannelDescriptor>, Bitmask> channelsToIndicesChange : kernelDestChannelDescriptorSetsToIndicesUpdates) {
                this.kernelDestChannelDescriptorSetsToIndices.computeIfAbsent(
                        channelsToIndicesChange.getField0(),
                        key -> new Bitmask(this.destChannelDescriptorSets.size())
                ).orInPlace(channelsToIndicesChange.getField1());
            }

            // Index the single ChannelDescriptors.
            this.kernelDestChannelDescriptorsToIndices = new HashMap<>();
            for (Map.Entry<Set<ChannelDescriptor>, Bitmask> entry : this.kernelDestChannelDescriptorSetsToIndices.entrySet()) {
                final Set<ChannelDescriptor> channelDescriptorSet = entry.getKey();
                final Bitmask indices = entry.getValue();

                for (ChannelDescriptor channelDescriptor : channelDescriptorSet) {
                    this.kernelDestChannelDescriptorsToIndices.merge(channelDescriptor, new Bitmask(indices), Bitmask::or);
                }
            }
        }