private boolean physOptimizeOp()

in algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java [191:330]


    private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
            boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {

        boolean changed = false;
        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
        optimizeUsingConstraintsAndEquivClasses(op);
        PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
        IPhysicalPropertiesVector[] reqdProperties = null;
        if (pr != null) {
            reqdProperties = pr.getRequiredProperties();
        }
        boolean opIsRedundantSort = false;

        // compute properties and figure out the domain
        INodeDomain childrenDomain = null;
        {
            int j = 0;
            for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
                AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
                // recursive call
                if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) {
                    changed = true;
                }
                child.computeDeliveredPhysicalProperties(context);
                IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
                if (childrenDomain == null) {
                    childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
                } else {
                    INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
                    if (!childrenDomain.sameAs(dom2)) {
                        childrenDomain = context.getComputationNodeDomain();
                    }
                }
                j++;
            }
        }

        if (reqdProperties != null) {
            for (int k = 0; k < reqdProperties.length; k++) {
                IPhysicalPropertiesVector pv = reqdProperties[k];
                IPartitioningProperty pp = pv.getPartitioningProperty();
                if (pp != null && pp.getNodeDomain() == null) {
                    pp.setNodeDomain(childrenDomain);
                }
            }
        }

        // The child index of the child operator to optimize first.
        int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
        IPartitioningProperty firstDeliveredPartitioning = null;
        // Enforce data properties in a top-down manner.
        for (int j = 0; j < op.getInputs().size(); j++) {
            // Starts from a partitioning-compatible child if any to loop over all children.
            int childIndex = (j + startChildIndex) % op.getInputs().size();
            IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
            AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
            IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();

            AlgebricksConfig.ALGEBRICKS_LOGGER
                    .finest(">>>> Properties delivered by " + child.getPhysicalOperator() + ": " + delivered + "\n");
            IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
            // Coordinates requirements by looking at the firstDeliveredPartitioning.
            Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
                    requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context);
            boolean mayExpandPartitioningProperties = pbpp.first;
            IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
                    requiredProperty.getLocalProperties());

            AlgebricksConfig.ALGEBRICKS_LOGGER
                    .finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n");
            // The partitioning property of reqdProperties[childIndex] could be updated here because
            // rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
            IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
                    mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child));

            if (isRedundantSort(opRef, delivered, diff, context)) {
                opIsRedundantSort = true;
            }

            if (diff != null) {
                changed = true;
                addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);

                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex)
                        .getValue());

                if (newChild != child) {
                    delivered = newChild.getDeliveredPhysicalProperties();
                    IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, rqd,
                            mayExpandPartitioningProperties, context);
                    AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");

                    if (isRedundantSort(opRef, delivered, newDiff, context)) {
                        opIsRedundantSort = true;
                        break;
                    }
                }
            }

            if (firstDeliveredPartitioning == null) {
                IPartitioningProperty dpp = delivered.getPartitioningProperty();
                if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
                        || dpp.getPartitioningType() == PartitioningType.UNORDERED_PARTITIONED) {
                    firstDeliveredPartitioning = dpp;
                }
            }
        }

        if (op.hasNestedPlans()) {
            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
            for (ILogicalPlan p : nested.getNestedPlans()) {
                if (physOptimizePlan(p, required, true, context)) {
                    changed = true;
                }
            }
        }

        if (opIsRedundantSort) {
            if (AlgebricksConfig.DEBUG) {
                AlgebricksConfig.ALGEBRICKS_LOGGER
                        .fine(">>>> Removing redundant SORT operator " + op.getPhysicalOperator() + "\n");
                printOp(op);
            }
            changed = true;
            AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
            if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
                nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue();
            }
            opRef.setValue(nextOp);
            // Now, transfer annotations from the original sort op. to this one.
            AbstractLogicalOperator transferTo = nextOp;
            if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                // remove duplicate exchange operator
                transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
            }
            transferTo.getAnnotations().putAll(op.getAnnotations());
            physOptimizeOp(opRef, required, nestedPlan, context);
        }
        return changed;
    }