in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumeration.java [302:437]
private Collection<PlanImplementation> concatenatePartialPlansBatchwise(
OutputSlot<?> openOutputSlot,
Collection<Channel> openChannels,
Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
OptimizationContext optimizationContext,
boolean isRequestBreakpoint,
PlanEnumeration concatenationEnumeration,
TimeMeasurement concatenationMeasurement) {
// Preparatory initializations.
final ChannelConversionGraph channelConversionGraph = optimizationContext.getChannelConversionGraph();
// Allocate result collector.
Collection<PlanImplementation> result = new LinkedList<>();
// Bring the InputSlots to fixed order.
List<InputSlot<?>> inputs = new ArrayList<>(targetEnumerations.keySet());
// Identify identical PlanEnumerations among the targetEnumerations and baseEnumeration.
MultiMap<PlanEnumeration, InputSlot<?>> targetEnumerationGroups = new MultiMap<>();
for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
targetEnumerationGroups.putSingle(entry.getValue(), entry.getKey());
}
// Group the PlanImplementations within each enumeration group.
MultiMap<PlanEnumeration, PlanImplementation.ConcatenationGroupDescriptor> enum2concatGroup = new MultiMap<>();
MultiMap<PlanImplementation.ConcatenationGroupDescriptor, PlanImplementation.ConcatenationDescriptor>
concatGroup2concatDescriptor = new MultiMap<>();
for (Map.Entry<PlanEnumeration, Set<InputSlot<?>>> entry : targetEnumerationGroups.entrySet()) {
PlanEnumeration planEnumeration = entry.getKey();
OutputSlot<?> groupOutput = planEnumeration == this ? openOutputSlot : null;
Set<InputSlot<?>> groupInputSet = entry.getValue();
List<InputSlot<?>> groupInputs = new ArrayList<>(inputs.size());
for (InputSlot<?> input : inputs) {
groupInputs.add(groupInputSet.contains(input) ? input : null);
}
for (PlanImplementation planImplementation : planEnumeration.getPlanImplementations()) {
PlanImplementation.ConcatenationDescriptor concatDescriptor =
planImplementation.createConcatenationDescriptor(groupOutput, groupInputs);
concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
}
}
// Handle cases where this instance is not a target enumeration.
if (!targetEnumerationGroups.containsKey(this)) {
List<InputSlot<?>> emptyGroupInputs = WayangCollections.createNullFilledArrayList(inputs.size());
for (PlanImplementation planImplementation : this.getPlanImplementations()) {
PlanImplementation.ConcatenationDescriptor concatDescriptor =
planImplementation.createConcatenationDescriptor(openOutputSlot, emptyGroupInputs);
concatGroup2concatDescriptor.putSingle(concatDescriptor.groupDescriptor, concatDescriptor);
enum2concatGroup.putSingle(planImplementation.getPlanEnumeration(), concatDescriptor.groupDescriptor);
}
}
if (logger.isInfoEnabled()) {
logger.info("Concatenating {}={} concatenation groups ({} -> {} inputs).",
enum2concatGroup.values().stream().map(groups -> String.valueOf(groups.size())).collect(Collectors.joining("*")),
enum2concatGroup.values().stream().mapToInt(Set::size).reduce(1, (a, b) -> a * b),
openOutputSlot,
targetEnumerations.size()
);
}
// Enumerate all combinations of the PlanEnumerations.
List<PlanEnumeration> orderedEnumerations = new ArrayList<>(enum2concatGroup.keySet());
orderedEnumerations.remove(this);
orderedEnumerations.add(0, this); // Make sure that the base enumeration is in the beginning.
List<Set<PlanImplementation.ConcatenationGroupDescriptor>> orderedConcatGroups = new ArrayList<>(orderedEnumerations.size());
for (PlanEnumeration enumeration : orderedEnumerations) {
orderedConcatGroups.add(enum2concatGroup.get(enumeration));
}
for (List<PlanImplementation.ConcatenationGroupDescriptor> concatGroupCombo : WayangCollections.streamedCrossProduct(orderedConcatGroups)) {
// Determine the execution output along with its OptimizationContext.
PlanImplementation.ConcatenationGroupDescriptor baseConcatGroup = concatGroupCombo.get(0);
final OutputSlot<?> execOutput = baseConcatGroup.execOutput;
Set<PlanImplementation.ConcatenationDescriptor> baseConcatDescriptors = concatGroup2concatDescriptor.get(baseConcatGroup);
final PlanImplementation innerPlanImplementation = WayangCollections.getAny(baseConcatDescriptors).execOutputPlanImplementation;
// The output should reside in the same OptimizationContext in all PlanImplementations.
assert baseConcatDescriptors.stream()
.map(cd -> cd.execOutputPlanImplementation)
.map(PlanImplementation::getOptimizationContext)
.collect(Collectors.toSet()).size() == 1;
// Determine the execution OutputSlots.
List<InputSlot<?>> execInputs = new ArrayList<>(inputs.size());
for (PlanImplementation.ConcatenationGroupDescriptor concatGroup : concatGroupCombo) {
for (Set<InputSlot<?>> execInputSet : concatGroup.execInputs) {
if (execInputSet != null) execInputs.addAll(execInputSet);
}
}
// Construct a Junction between the ExecutionOperators.
final Operator outputOperator = execOutput.getOwner();
assert outputOperator.isExecutionOperator() : String.format("Expected execution operator, found %s.", outputOperator);
TimeMeasurement channelConversionMeasurement = concatenationMeasurement == null ?
null : concatenationMeasurement.start("Channel Conversion");
final Junction junction = openChannels == null || openChannels.isEmpty() ?
channelConversionGraph.findMinimumCostJunction(
execOutput,
execInputs,
innerPlanImplementation.getOptimizationContext(),
isRequestBreakpoint
) :
channelConversionGraph.findMinimumCostJunction(
execOutput,
openChannels,
execInputs,
innerPlanImplementation.getOptimizationContext());
if (channelConversionMeasurement != null) channelConversionMeasurement.stop();
if (junction == null) continue;
// If we found a junction, then we can enumerate all PlanImplementation combinations.
final List<Set<PlanImplementation>> groupPlans = WayangCollections.map(
concatGroupCombo,
concatGroup -> {
Set<PlanImplementation.ConcatenationDescriptor> concatDescriptors = concatGroup2concatDescriptor.get(concatGroup);
Set<PlanImplementation> planImplementations = new HashSet<>(concatDescriptors.size());
for (PlanImplementation.ConcatenationDescriptor concatDescriptor : concatDescriptors) {
planImplementations.add(concatDescriptor.getPlanImplementation());
}
return planImplementations;
});
for (List<PlanImplementation> planCombo : WayangCollections.streamedCrossProduct(groupPlans)) {
PlanImplementation basePlan = planCombo.get(0);
List<PlanImplementation> targetPlans = planCombo.subList(0, planCombo.size());
PlanImplementation concatenatedPlan = basePlan.concatenate(targetPlans, junction, basePlan, concatenationEnumeration);
if (concatenatedPlan != null) {
result.add(concatenatedPlan);
}
}
}
return result;
}