in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumeration.java [199:262]
public PlanEnumeration concatenate(OutputSlot<?> openOutputSlot,
Collection<Channel> openChannels,
Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
OptimizationContext optimizationContext,
TimeMeasurement enumerationMeasurement) {
// Check the parameters' validity.
assert this.getServingOutputSlots().stream()
.map(Tuple::getField0)
.anyMatch(openOutputSlot::equals)
: String.format("Cannot concatenate %s: it is not a served output.", openOutputSlot);
assert !targetEnumerations.isEmpty();
final TimeMeasurement concatenationMeasurement = enumerationMeasurement == null ?
null :
enumerationMeasurement.start("Concatenation");
if (logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Concatenating ").append(this.getPlanImplementations().size());
for (PlanEnumeration targetEnumeration : targetEnumerations.values()) {
sb.append("x").append(targetEnumeration.getPlanImplementations().size());
}
sb.append(" plan implementations.");
logger.debug(sb.toString());
}
// Prepare the result instance from this instance.
PlanEnumeration result = new PlanEnumeration();
result.scope.addAll(this.getScope());
result.requestedInputSlots.addAll(this.getRequestedInputSlots());
result.servingOutputSlots.addAll(this.getServingOutputSlots());
result.executedTasks.putAll(this.getExecutedTasks());
// Update the result instance from the target instances.
for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
final InputSlot<?> openInputSlot = entry.getKey();
final PlanEnumeration targetEnumeration = entry.getValue();
result.scope.addAll(targetEnumeration.getScope());
result.requestedInputSlots.addAll(targetEnumeration.getRequestedInputSlots());
result.servingOutputSlots.addAll(targetEnumeration.getServingOutputSlots());
result.executedTasks.putAll(targetEnumeration.getExecutedTasks());
}
// NB: We need to store remove the InputSlots only here, because a single targetEnumeration
// might service multiple InputSlots. If this targetEnumeration is then also the baseEnumeration, it might
// re-request already serviced InputSlots, although already deleted.
result.requestedInputSlots.removeAll(targetEnumerations.keySet());
result.servingOutputSlots.removeIf(slotService -> slotService.getField0().equals(openOutputSlot));
// Create the PlanImplementations.
result.planImplementations.addAll(this.concatenatePartialPlans(
openOutputSlot,
openChannels,
targetEnumerations,
optimizationContext,
result,
concatenationMeasurement
));
logger.debug("Created {} plan implementations.", result.getPlanImplementations().size());
if (concatenationMeasurement != null) concatenationMeasurement.stop();
return result;
}