private Collection concatenatePartialPlansBatchwise()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumeration.java [302:437]


    private Collection<PlanImplementation> concatenatePartialPlansBatchwise(
            OutputSlot<?> openOutputSlot,
            Collection<Channel> openChannels,
            Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
            OptimizationContext optimizationContext,
            boolean isRequestBreakpoint,
            PlanEnumeration concatenationEnumeration,
            TimeMeasurement concatenationMeasurement) {

        // Preparatory initializations.
        final ChannelConversionGraph channelConversionGraph = optimizationContext.getChannelConversionGraph();

        // Allocate result collector.
        Collection<PlanImplementation> result = new LinkedList<>();

        // Bring the InputSlots to fixed order.
        List<InputSlot<?>> inputs = new ArrayList<>(targetEnumerations.keySet());

        // Identify identical PlanEnumerations among the targetEnumerations and baseEnumeration.
        MultiMap<PlanEnumeration, InputSlot<?>> targetEnumerationGroups = new MultiMap<>();
        for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
            targetEnumerationGroups.putSingle(entry.getValue(), entry.getKey());
        }

        // Group the PlanImplementations within each enumeration group.
        MultiMap<PlanEnumeration, PlanImplementation.ConcatenationGroupDescriptor> enum2concatGroup = new MultiMap<>();
        MultiMap<PlanImplementation.ConcatenationGroupDescriptor, PlanImplementation.ConcatenationDescriptor>
                concatGroup2concatDescriptor = new MultiMap<>();
        for (Map.Entry<PlanEnumeration, Set<InputSlot<?>>> entry : targetEnumerationGroups.entrySet()) {
            PlanEnumeration planEnumeration = entry.getKey();
            OutputSlot<?> groupOutput = planEnumeration == this ? openOutputSlot : null;
            Set<InputSlot<?>> groupInputSet = entry.getValue();
            List<InputSlot<?>> groupInputs = new ArrayList<>(inputs.size());
            for (InputSlot<?> input : inputs) {
                groupInputs.add(groupInputSet.contains(input) ? input : null);
            }
            for (PlanImplementation planImplementation : planEnumeration.getPlanImplementations()) {
                PlanImplementation.ConcatenationDescriptor concatDescriptor =
                        planImplementation.createConcatenationDescriptor(groupOutput, groupInputs);
                concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
                enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
            }
        }

        // Handle cases where this instance is not a target enumeration.
        if (!targetEnumerationGroups.containsKey(this)) {
            List<InputSlot<?>> emptyGroupInputs = WayangCollections.createNullFilledArrayList(inputs.size());
            for (PlanImplementation planImplementation : this.getPlanImplementations()) {
                PlanImplementation.ConcatenationDescriptor concatDescriptor =
                        planImplementation.createConcatenationDescriptor(openOutputSlot, emptyGroupInputs);
                concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
                enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Concatenating {}={} concatenation groups ({} -> {} inputs).",
                    enum2concatGroup.values().stream().map(groups -> String.valueOf(groups.size())).collect(Collectors.joining("*")),
                    enum2concatGroup.values().stream().mapToInt(Set::size).reduce(1, (a, b) -> a * b),
                    openOutputSlot,
                    targetEnumerations.size()
            );
        }

        // Enumerate all combinations of the PlanEnumerations.
        List<PlanEnumeration> orderedEnumerations = new ArrayList<>(enum2concatGroup.keySet());
        orderedEnumerations.remove(this);
        orderedEnumerations.add(0, this); // Make sure that the base enumeration is in the beginning.
        List<Set<PlanImplementation.ConcatenationGroupDescriptor>> orderedConcatGroups = new ArrayList<>(orderedEnumerations.size());
        for (PlanEnumeration enumeration : orderedEnumerations) {
            orderedConcatGroups.add(enum2concatGroup.get(enumeration));
        }
        for (List<PlanImplementation.ConcatenationGroupDescriptor> concatGroupCombo : WayangCollections.streamedCrossProduct(orderedConcatGroups)) {
            // Determine the execution output along with its OptimizationContext.
            PlanImplementation.ConcatenationGroupDescriptor baseConcatGroup = concatGroupCombo.get(0);
            final OutputSlot<?> execOutput = baseConcatGroup.execOutput;
            Set<PlanImplementation.ConcatenationDescriptor> baseConcatDescriptors = concatGroup2concatDescriptor.get(baseConcatGroup);
            final PlanImplementation innerPlanImplementation = WayangCollections.getAny(baseConcatDescriptors).execOutputPlanImplementation;
            // The output should reside in the same OptimizationContext in all PlanImplementations.
            assert baseConcatDescriptors.stream()
                    .map(cd -> cd.execOutputPlanImplementation)
                    .map(PlanImplementation::getOptimizationContext)
                    .collect(Collectors.toSet()).size() == 1;

            // Determine the execution OutputSlots.
            List<InputSlot<?>> execInputs = new ArrayList<>(inputs.size());
            for (PlanImplementation.ConcatenationGroupDescriptor concatGroup : concatGroupCombo) {
                for (Set<InputSlot<?>> execInputSet : concatGroup.execInputs) {
                    if (execInputSet != null) execInputs.addAll(execInputSet);
                }
            }

            // Construct a Junction between the ExecutionOperators.
            final Operator outputOperator = execOutput.getOwner();
            assert outputOperator.isExecutionOperator() : String.format("Expected execution operator, found %s.", outputOperator);
            TimeMeasurement channelConversionMeasurement = concatenationMeasurement == null ?
                    null : concatenationMeasurement.start("Channel Conversion");
            final Junction junction = openChannels == null || openChannels.isEmpty() ?
                    channelConversionGraph.findMinimumCostJunction(
                            execOutput,
                            execInputs,
                            innerPlanImplementation.getOptimizationContext(),
                            isRequestBreakpoint
                    ) :
                    channelConversionGraph.findMinimumCostJunction(
                            execOutput,
                            openChannels,
                            execInputs,
                            innerPlanImplementation.getOptimizationContext());
            if (channelConversionMeasurement != null) channelConversionMeasurement.stop();
            if (junction == null) continue;

            // If we found a junction, then we can enumerate all PlanImplementation combinations.
            final List<Set<PlanImplementation>> groupPlans = WayangCollections.map(
                    concatGroupCombo,
                    concatGroup -> {
                        Set<PlanImplementation.ConcatenationDescriptor> concatDescriptors = concatGroup2concatDescriptor.get(concatGroup);
                        Set<PlanImplementation> planImplementations = new HashSet<>(concatDescriptors.size());
                        for (PlanImplementation.ConcatenationDescriptor concatDescriptor : concatDescriptors) {
                            planImplementations.add(concatDescriptor.getPlanImplementation());
                        }
                        return planImplementations;
                    });

            for (List<PlanImplementation> planCombo : WayangCollections.streamedCrossProduct(groupPlans)) {
                PlanImplementation basePlan = planCombo.get(0);
                List<PlanImplementation> targetPlans = planCombo.subList(0, planCombo.size());
                PlanImplementation concatenatedPlan = basePlan.concatenate(targetPlans, junction, basePlan, concatenationEnumeration);
                if (concatenatedPlan != null) {
                    result.add(concatenatedPlan);
                }
            }
        }

        return result;
    }