in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/SpatialJoinUtils.java [266:442]
private static void buildSpatialJoinPlanWithDynamicMbr(AbstractBinaryJoinOperator op, IOptimizationContext context,
AbstractFunctionCallExpression spatialJoinFuncExpr, List<Mutable<ILogicalExpression>> conditionExprs,
Mutable<ILogicalOperator> leftInputOp, Mutable<ILogicalOperator> rightInputOp, LogicalVariable leftInputVar,
LogicalVariable rightInputVar) throws AlgebricksException {
// Add a dynamic workflow to compute MBR of the left branch
Triple<MutableObject<ILogicalOperator>, List<LogicalVariable>, MutableObject<ILogicalOperator>> leftMBRCalculator =
createDynamicMBRCalculator(op, context, leftInputOp, leftInputVar);
MutableObject<ILogicalOperator> leftGlobalAgg = leftMBRCalculator.first;
List<LogicalVariable> leftGlobalAggResultVars = leftMBRCalculator.second;
MutableObject<ILogicalOperator> leftExchToJoinOpRef = leftMBRCalculator.third;
LogicalVariable leftMBRVar = leftGlobalAggResultVars.get(0);
// Add a dynamic workflow to compute MBR of the right branch
Triple<MutableObject<ILogicalOperator>, List<LogicalVariable>, MutableObject<ILogicalOperator>> rightMBRCalculator =
createDynamicMBRCalculator(op, context, rightInputOp, rightInputVar);
MutableObject<ILogicalOperator> rightGlobalAgg = rightMBRCalculator.first;
List<LogicalVariable> rightGlobalAggResultVars = rightMBRCalculator.second;
MutableObject<ILogicalOperator> rightExchToJoinOpRef = rightMBRCalculator.third;
LogicalVariable rightMBRVar = rightGlobalAggResultVars.get(0);
// Join the left and right union MBR
Mutable<ILogicalExpression> trueCondition =
new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
InnerJoinOperator unionMBRJoinOp = new InnerJoinOperator(trueCondition, leftGlobalAgg, rightGlobalAgg);
unionMBRJoinOp.setSourceLocation(op.getSourceLocation());
unionMBRJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
MutableObject<ILogicalOperator> unionMBRJoinOpRef = new MutableObject<>(unionMBRJoinOp);
unionMBRJoinOp.recomputeSchema();
context.computeAndSetTypeEnvironmentForOperator(unionMBRJoinOp);
// Compute the intersection rectangle of left MBR and right MBR
List<Mutable<ILogicalExpression>> getIntersectionFuncInputExprs = new ArrayList<>();
getIntersectionFuncInputExprs.add(new MutableObject<>(new VariableReferenceExpression(leftMBRVar)));
getIntersectionFuncInputExprs.add(new MutableObject<>(new VariableReferenceExpression(rightMBRVar)));
ScalarFunctionCallExpression getIntersectionFuncExpr = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.GET_INTERSECTION),
getIntersectionFuncInputExprs);
getIntersectionFuncExpr.setSourceLocation(op.getSourceLocation());
Mutable<ILogicalExpression> intersectionMBRExpr = new MutableObject<>(getIntersectionFuncExpr);
LogicalVariable intersectionMBR = context.newVar();
AbstractLogicalOperator intersectionMBRAssignOperator =
new AssignOperator(intersectionMBR, intersectionMBRExpr);
intersectionMBRAssignOperator.setSourceLocation(op.getSourceLocation());
intersectionMBRAssignOperator.setExecutionMode(op.getExecutionMode());
intersectionMBRAssignOperator.setPhysicalOperator(new AssignPOperator());
intersectionMBRAssignOperator.getInputs().add(new MutableObject<>(unionMBRJoinOpRef.getValue()));
context.computeAndSetTypeEnvironmentForOperator(intersectionMBRAssignOperator);
intersectionMBRAssignOperator.recomputeSchema();
MutableObject<ILogicalOperator> intersectionMBRAssignOperatorRef =
new MutableObject<>(intersectionMBRAssignOperator);
// Replicate the union MBR to left and right nested loop join(NLJ) operator, and another NLJ for reference point test
ReplicateOperator intersectionMBRReplicateOperator =
createReplicateOperator(intersectionMBRAssignOperatorRef, context, op.getSourceLocation(), 3);
// Replicate union MBR to the left branch
ExchangeOperator exchMBRToJoinOpLeft =
createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
MutableObject<ILogicalOperator> exchMBRToJoinOpLeftRef = new MutableObject<>(exchMBRToJoinOpLeft);
Pair<LogicalVariable, Mutable<ILogicalOperator>> createLeftAssignProjectOperatorResult =
createAssignProjectOperator(op, intersectionMBR, intersectionMBRReplicateOperator,
exchMBRToJoinOpLeftRef, context);
LogicalVariable leftIntersectionMBRVar = createLeftAssignProjectOperatorResult.getFirst();
Mutable<ILogicalOperator> leftIntersectionMBRRef = createLeftAssignProjectOperatorResult.getSecond();
// Replicate union MBR to the right branch
ExchangeOperator exchMBRToJoinOpRight =
createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
MutableObject<ILogicalOperator> exchMBRToJoinOpRightRef = new MutableObject<>(exchMBRToJoinOpRight);
Pair<LogicalVariable, Mutable<ILogicalOperator>> createRightAssignProjectOperatorResult =
createAssignProjectOperator(op, intersectionMBR, intersectionMBRReplicateOperator,
exchMBRToJoinOpRightRef, context);
LogicalVariable rightIntersectionMBRVar = createRightAssignProjectOperatorResult.getFirst();
Mutable<ILogicalOperator> rightIntersectionMBRRef = createRightAssignProjectOperatorResult.getSecond();
// Replicate union MBR to the right branch of a later Nested Loop Join reference point test
ExchangeOperator exchMBRToReferencePointTestJoinOp =
createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
MutableObject<ILogicalOperator> exchMBRToReferencePointTestJoinOpRef =
new MutableObject<>(exchMBRToReferencePointTestJoinOp);
// Add left Join (TRUE)
Mutable<ILogicalExpression> leftTrueCondition =
new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
InnerJoinOperator leftJoinOp =
new InnerJoinOperator(leftTrueCondition, leftExchToJoinOpRef, leftIntersectionMBRRef);
leftJoinOp.setSourceLocation(op.getSourceLocation());
leftJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
MutableObject<ILogicalOperator> leftJoinRef = new MutableObject<>(leftJoinOp);
leftJoinOp.recomputeSchema();
context.computeAndSetTypeEnvironmentForOperator(leftJoinOp);
leftInputOp.setValue(leftJoinRef.getValue());
// Add right Join (TRUE)
Mutable<ILogicalExpression> rightTrueCondition =
new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
InnerJoinOperator rightJoinOp =
new InnerJoinOperator(rightTrueCondition, rightExchToJoinOpRef, rightIntersectionMBRRef);
rightJoinOp.setSourceLocation(op.getSourceLocation());
rightJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
MutableObject<ILogicalOperator> rightJoinRef = new MutableObject<>(rightJoinOp);
rightJoinOp.recomputeSchema();
context.computeAndSetTypeEnvironmentForOperator(rightJoinOp);
rightInputOp.setValue(rightJoinRef.getValue());
Mutable<ILogicalExpression> leftIntersectionMBRExpr =
new MutableObject<>(new VariableReferenceExpression(leftIntersectionMBRVar));
Mutable<ILogicalExpression> rightIntersectionMBRExpr =
new MutableObject<>(new VariableReferenceExpression(rightIntersectionMBRVar));
Mutable<ILogicalExpression> referencePointTestMBRExpr =
new MutableObject<>(new VariableReferenceExpression(intersectionMBR));
// Inject unnest operator to add tile ID to the left and right branch of the join operator
LogicalVariable leftTileIdVar = SpatialJoinUtils.injectSpatialTileUnnestOperator(context, leftInputOp,
leftInputVar, leftIntersectionMBRExpr, DEFAULT_ROWS, DEFAULT_COLUMNS);
LogicalVariable rightTileIdVar = SpatialJoinUtils.injectSpatialTileUnnestOperator(context, rightInputOp,
rightInputVar, rightIntersectionMBRExpr, DEFAULT_ROWS, DEFAULT_COLUMNS);
// Reference point test condition will be used as the condition of a Nested Loop Join operator after the
// spatial join operator. This design allow us to use the union MBR (or summary of the join) efficiently,
// instead of propagate this variable via Hyracks context or data flow.
ScalarFunctionCallExpression referenceIdEquiJoinCondition =
createReferencePointTestCondition(op, referencePointTestMBRExpr, leftTileIdVar, rightTileIdVar,
leftInputVar, rightInputVar, DEFAULT_ROWS, DEFAULT_COLUMNS);
conditionExprs.add(new MutableObject<>(spatialJoinFuncExpr));
ScalarFunctionCallExpression updatedJoinCondition;
if (conditionExprs.size() > 1) {
updatedJoinCondition = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.AND), conditionExprs);
updatedJoinCondition.setSourceLocation(op.getSourceLocation());
} else {
updatedJoinCondition = (ScalarFunctionCallExpression) spatialJoinFuncExpr;
}
Mutable<ILogicalExpression> joinConditionRef = op.getCondition();
joinConditionRef.setValue(updatedJoinCondition);
List<LogicalVariable> keysLeftBranch = new ArrayList<>();
keysLeftBranch.add(leftTileIdVar);
keysLeftBranch.add(leftInputVar);
List<LogicalVariable> keysRightBranch = new ArrayList<>();
keysRightBranch.add(rightTileIdVar);
keysRightBranch.add(rightInputVar);
InnerJoinOperator spatialJoinOp =
new InnerJoinOperator(new MutableObject<>(updatedJoinCondition), leftInputOp, rightInputOp);
spatialJoinOp.setSourceLocation(op.getSourceLocation());
SpatialJoinUtils.setSpatialJoinOp(spatialJoinOp, keysLeftBranch, keysRightBranch, context);
spatialJoinOp.setSchema(op.getSchema());
context.computeAndSetTypeEnvironmentForOperator(spatialJoinOp);
Mutable<ILogicalOperator> opRef = new MutableObject<>(op);
Mutable<ILogicalOperator> spatialJoinOpRef = new MutableObject<>(spatialJoinOp);
InnerJoinOperator referencePointTestJoinOp =
new InnerJoinOperator(new MutableObject<>(referenceIdEquiJoinCondition), spatialJoinOpRef,
exchMBRToReferencePointTestJoinOpRef);
referencePointTestJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(
AbstractBinaryJoinOperator.JoinKind.INNER, AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
MutableObject<ILogicalOperator> referencePointTestJoinOpRef = new MutableObject<>(referencePointTestJoinOp);
referencePointTestJoinOp.setSourceLocation(op.getSourceLocation());
context.computeAndSetTypeEnvironmentForOperator(referencePointTestJoinOp);
referencePointTestJoinOp.recomputeSchema();
opRef.setValue(referencePointTestJoinOpRef.getValue());
op.getInputs().clear();
op.getInputs().addAll(referencePointTestJoinOp.getInputs());
op.setPhysicalOperator(referencePointTestJoinOp.getPhysicalOperator());
op.getCondition().setValue(referencePointTestJoinOp.getCondition().getValue());
context.computeAndSetTypeEnvironmentForOperator(op);
op.recomputeSchema();
}