private ShortestTreeSearcher()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/channels/ChannelConversionGraph.java [346:425]


        private ShortestTreeSearcher(OutputSlot<?> sourceOutput,
                                     Collection<Channel> openChannels,
                                     List<InputSlot<?>> destInputs,
                                     OptimizationContext optimizationContext,
                                     boolean isRequestBreakpoint) {

            // Store relevant variables.
            this.isRequestBreakpoint = isRequestBreakpoint && openChannels == null; // No breakpoints requestable.
            this.optimizationContext = optimizationContext;
            this.optimizationContextCopy = new DefaultOptimizationContext(this.optimizationContext);
            this.sourceOutput = sourceOutput;
            this.destInputs = destInputs;
            final boolean isOpenChannelsPresent = openChannels != null && !openChannels.isEmpty();

            // Figure out the optimization info via the sourceOutput.
            final ExecutionOperator outputOperator = (ExecutionOperator) this.sourceOutput.getOwner();
            final OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(outputOperator);
            assert operatorContext != null : String.format("Optimization info for %s missing.", outputOperator);
            this.cardinality = operatorContext.getOutputCardinality(this.sourceOutput.getIndex());
            this.numExecutions = operatorContext.getNumExecutions();

            // Figure out, if a part of the conversion is already in place and initialize accordingly.

            if (isOpenChannelsPresent) {
                // Take any open channel and trace it back to the source channel.
                Channel existingChannel = WayangCollections.getAny(openChannels);
                while (existingChannel.getProducerSlot() != sourceOutput) {
                    existingChannel = OptimizationUtils.getPredecessorChannel(existingChannel);
                }
                final Channel sourceChannel = existingChannel;
                this.sourceChannelDescriptor = sourceChannel.getDescriptor();

                // Now traverse down-stream to find already reached destinations.
                this.existingChannels = new HashMap<>();
                this.existingDestinationChannels = new HashMap<>(4);
                this.existingDestinationChannelIndices = new Bitmask();

                this.collectExistingChannels(sourceChannel);
                this.openChannelDescriptors = new HashSet<>(openChannels.size());
                for (Channel openChannel : openChannels) {
                    this.openChannelDescriptors.add(openChannel.getDescriptor());
                }
            } else {
                this.sourceChannelDescriptor = outputOperator.getOutputChannelDescriptor(this.sourceOutput.getIndex());
                this.existingChannels = Collections.emptyMap();
                this.existingDestinationChannels = Collections.emptyMap();
                this.existingDestinationChannelIndices = new Bitmask();
                this.openChannelDescriptors = Collections.emptySet();
            }


            // Set up the destinations.
            this.destChannelDescriptorSets = WayangCollections.map(destInputs, this::resolveSupportedChannels);
            assert this.destChannelDescriptorSets.stream().noneMatch(Collection::isEmpty);
            this.kernelizeChannelRequests();

            if (isOpenChannelsPresent) {
                // Update the bitmask of all already reached destination channels...
                // ...and mark the paths of already reached destination channels via upstream traversal.
                this.reachableExistingDestinationChannelIndices = new HashMap<>();
                for (Channel existingDestinationChannel : this.existingDestinationChannels.values()) {
                    final Bitmask channelIndices = this.kernelDestChannelDescriptorsToIndices
                            .get(existingDestinationChannel.getDescriptor())
                            .and(this.existingDestinationChannelIndices);
                    while (true) {
                        this.reachableExistingDestinationChannelIndices.compute(
                                existingDestinationChannel.getDescriptor(),
                                (k, v) -> v == null ? new Bitmask(channelIndices) : v.orInPlace(channelIndices)
                        );
                        if (existingDestinationChannel.getDescriptor().equals(this.sourceChannelDescriptor)) break;
                        existingDestinationChannel = OptimizationUtils.getPredecessorChannel(existingDestinationChannel);
                    }
                }
            } else {
                this.reachableExistingDestinationChannelIndices = Collections.emptyMap();
            }

            this.absentDestinationChannelIndices = this.allDestinationChannelIndices
                    .andNot(this.existingDestinationChannelIndices);
        }