public PlanEnumeration concatenate()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumeration.java [199:262]


    public PlanEnumeration concatenate(OutputSlot<?> openOutputSlot,
                                       Collection<Channel> openChannels,
                                       Map<InputSlot<?>, PlanEnumeration> targetEnumerations,
                                       OptimizationContext optimizationContext,
                                       TimeMeasurement enumerationMeasurement) {

        // Check the parameters' validity.
        assert this.getServingOutputSlots().stream()
                .map(Tuple::getField0)
                .anyMatch(openOutputSlot::equals)
                : String.format("Cannot concatenate %s: it is not a served output.", openOutputSlot);
        assert !targetEnumerations.isEmpty();

        final TimeMeasurement concatenationMeasurement = enumerationMeasurement == null ?
                null :
                enumerationMeasurement.start("Concatenation");

        if (logger.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("Concatenating ").append(this.getPlanImplementations().size());
            for (PlanEnumeration targetEnumeration : targetEnumerations.values()) {
                sb.append("x").append(targetEnumeration.getPlanImplementations().size());
            }
            sb.append(" plan implementations.");
            logger.debug(sb.toString());
        }

        // Prepare the result instance from this instance.
        PlanEnumeration result = new PlanEnumeration();
        result.scope.addAll(this.getScope());
        result.requestedInputSlots.addAll(this.getRequestedInputSlots());
        result.servingOutputSlots.addAll(this.getServingOutputSlots());
        result.executedTasks.putAll(this.getExecutedTasks());

        // Update the result instance from the target instances.
        for (Map.Entry<InputSlot<?>, PlanEnumeration> entry : targetEnumerations.entrySet()) {
            final InputSlot<?> openInputSlot = entry.getKey();
            final PlanEnumeration targetEnumeration = entry.getValue();
            result.scope.addAll(targetEnumeration.getScope());
            result.requestedInputSlots.addAll(targetEnumeration.getRequestedInputSlots());
            result.servingOutputSlots.addAll(targetEnumeration.getServingOutputSlots());
            result.executedTasks.putAll(targetEnumeration.getExecutedTasks());
        }

        // NB: We need to store remove the InputSlots only here, because a single targetEnumeration
        // might service multiple InputSlots. If this targetEnumeration is then also the baseEnumeration, it might
        // re-request already serviced InputSlots, although already deleted.
        result.requestedInputSlots.removeAll(targetEnumerations.keySet());
        result.servingOutputSlots.removeIf(slotService -> slotService.getField0().equals(openOutputSlot));

        // Create the PlanImplementations.
        result.planImplementations.addAll(this.concatenatePartialPlans(
                openOutputSlot,
                openChannels,
                targetEnumerations,
                optimizationContext,
                result,
                concatenationMeasurement
        ));

        logger.debug("Created {} plan implementations.", result.getPlanImplementations().size());
        if (concatenationMeasurement != null) concatenationMeasurement.stop();
        return result;
    }