in algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java [191:330]
private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
optimizeUsingConstraintsAndEquivClasses(op);
PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
IPhysicalPropertiesVector[] reqdProperties = null;
if (pr != null) {
reqdProperties = pr.getRequiredProperties();
}
boolean opIsRedundantSort = false;
// compute properties and figure out the domain
INodeDomain childrenDomain = null;
{
int j = 0;
for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
// recursive call
if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) {
changed = true;
}
child.computeDeliveredPhysicalProperties(context);
IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
if (childrenDomain == null) {
childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
} else {
INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
if (!childrenDomain.sameAs(dom2)) {
childrenDomain = context.getComputationNodeDomain();
}
}
j++;
}
}
if (reqdProperties != null) {
for (int k = 0; k < reqdProperties.length; k++) {
IPhysicalPropertiesVector pv = reqdProperties[k];
IPartitioningProperty pp = pv.getPartitioningProperty();
if (pp != null && pp.getNodeDomain() == null) {
pp.setNodeDomain(childrenDomain);
}
}
}
// The child index of the child operator to optimize first.
int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
IPartitioningProperty firstDeliveredPartitioning = null;
// Enforce data properties in a top-down manner.
for (int j = 0; j < op.getInputs().size(); j++) {
// Starts from a partitioning-compatible child if any to loop over all children.
int childIndex = (j + startChildIndex) % op.getInputs().size();
IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
AlgebricksConfig.ALGEBRICKS_LOGGER
.finest(">>>> Properties delivered by " + child.getPhysicalOperator() + ": " + delivered + "\n");
IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
// Coordinates requirements by looking at the firstDeliveredPartitioning.
Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context);
boolean mayExpandPartitioningProperties = pbpp.first;
IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
requiredProperty.getLocalProperties());
AlgebricksConfig.ALGEBRICKS_LOGGER
.finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n");
// The partitioning property of reqdProperties[childIndex] could be updated here because
// rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child));
if (isRedundantSort(opRef, delivered, diff, context)) {
opIsRedundantSort = true;
}
if (diff != null) {
changed = true;
addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex)
.getValue());
if (newChild != child) {
delivered = newChild.getDeliveredPhysicalProperties();
IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, rqd,
mayExpandPartitioningProperties, context);
AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");
if (isRedundantSort(opRef, delivered, newDiff, context)) {
opIsRedundantSort = true;
break;
}
}
}
if (firstDeliveredPartitioning == null) {
IPartitioningProperty dpp = delivered.getPartitioningProperty();
if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
|| dpp.getPartitioningType() == PartitioningType.UNORDERED_PARTITIONED) {
firstDeliveredPartitioning = dpp;
}
}
}
if (op.hasNestedPlans()) {
AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
if (physOptimizePlan(p, required, true, context)) {
changed = true;
}
}
}
if (opIsRedundantSort) {
if (AlgebricksConfig.DEBUG) {
AlgebricksConfig.ALGEBRICKS_LOGGER
.fine(">>>> Removing redundant SORT operator " + op.getPhysicalOperator() + "\n");
printOp(op);
}
changed = true;
AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue();
}
opRef.setValue(nextOp);
// Now, transfer annotations from the original sort op. to this one.
AbstractLogicalOperator transferTo = nextOp;
if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
// remove duplicate exchange operator
transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
}
transferTo.getAnnotations().putAll(op.getAnnotations());
physOptimizeOp(opRef, required, nestedPlan, context);
}
return changed;
}