private static void buildSpatialJoinPlanWithDynamicMbr()

in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/SpatialJoinUtils.java [266:442]


    private static void buildSpatialJoinPlanWithDynamicMbr(AbstractBinaryJoinOperator op, IOptimizationContext context,
            AbstractFunctionCallExpression spatialJoinFuncExpr, List<Mutable<ILogicalExpression>> conditionExprs,
            Mutable<ILogicalOperator> leftInputOp, Mutable<ILogicalOperator> rightInputOp, LogicalVariable leftInputVar,
            LogicalVariable rightInputVar) throws AlgebricksException {
        // Add a dynamic workflow to compute MBR of the left branch
        Triple<MutableObject<ILogicalOperator>, List<LogicalVariable>, MutableObject<ILogicalOperator>> leftMBRCalculator =
                createDynamicMBRCalculator(op, context, leftInputOp, leftInputVar);
        MutableObject<ILogicalOperator> leftGlobalAgg = leftMBRCalculator.first;
        List<LogicalVariable> leftGlobalAggResultVars = leftMBRCalculator.second;
        MutableObject<ILogicalOperator> leftExchToJoinOpRef = leftMBRCalculator.third;
        LogicalVariable leftMBRVar = leftGlobalAggResultVars.get(0);

        // Add a dynamic workflow to compute MBR of the right branch
        Triple<MutableObject<ILogicalOperator>, List<LogicalVariable>, MutableObject<ILogicalOperator>> rightMBRCalculator =
                createDynamicMBRCalculator(op, context, rightInputOp, rightInputVar);
        MutableObject<ILogicalOperator> rightGlobalAgg = rightMBRCalculator.first;
        List<LogicalVariable> rightGlobalAggResultVars = rightMBRCalculator.second;
        MutableObject<ILogicalOperator> rightExchToJoinOpRef = rightMBRCalculator.third;
        LogicalVariable rightMBRVar = rightGlobalAggResultVars.get(0);

        // Join the left and right union MBR
        Mutable<ILogicalExpression> trueCondition =
                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
        InnerJoinOperator unionMBRJoinOp = new InnerJoinOperator(trueCondition, leftGlobalAgg, rightGlobalAgg);
        unionMBRJoinOp.setSourceLocation(op.getSourceLocation());
        unionMBRJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
        MutableObject<ILogicalOperator> unionMBRJoinOpRef = new MutableObject<>(unionMBRJoinOp);
        unionMBRJoinOp.recomputeSchema();
        context.computeAndSetTypeEnvironmentForOperator(unionMBRJoinOp);

        // Compute the intersection rectangle of left MBR and right MBR
        List<Mutable<ILogicalExpression>> getIntersectionFuncInputExprs = new ArrayList<>();
        getIntersectionFuncInputExprs.add(new MutableObject<>(new VariableReferenceExpression(leftMBRVar)));
        getIntersectionFuncInputExprs.add(new MutableObject<>(new VariableReferenceExpression(rightMBRVar)));
        ScalarFunctionCallExpression getIntersectionFuncExpr = new ScalarFunctionCallExpression(
                BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.GET_INTERSECTION),
                getIntersectionFuncInputExprs);
        getIntersectionFuncExpr.setSourceLocation(op.getSourceLocation());

        Mutable<ILogicalExpression> intersectionMBRExpr = new MutableObject<>(getIntersectionFuncExpr);
        LogicalVariable intersectionMBR = context.newVar();
        AbstractLogicalOperator intersectionMBRAssignOperator =
                new AssignOperator(intersectionMBR, intersectionMBRExpr);
        intersectionMBRAssignOperator.setSourceLocation(op.getSourceLocation());
        intersectionMBRAssignOperator.setExecutionMode(op.getExecutionMode());
        intersectionMBRAssignOperator.setPhysicalOperator(new AssignPOperator());
        intersectionMBRAssignOperator.getInputs().add(new MutableObject<>(unionMBRJoinOpRef.getValue()));
        context.computeAndSetTypeEnvironmentForOperator(intersectionMBRAssignOperator);
        intersectionMBRAssignOperator.recomputeSchema();
        MutableObject<ILogicalOperator> intersectionMBRAssignOperatorRef =
                new MutableObject<>(intersectionMBRAssignOperator);

        // Replicate the union MBR to left and right nested loop join(NLJ) operator, and another NLJ for reference point test
        ReplicateOperator intersectionMBRReplicateOperator =
                createReplicateOperator(intersectionMBRAssignOperatorRef, context, op.getSourceLocation(), 3);

        // Replicate union MBR to the left branch
        ExchangeOperator exchMBRToJoinOpLeft =
                createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
        MutableObject<ILogicalOperator> exchMBRToJoinOpLeftRef = new MutableObject<>(exchMBRToJoinOpLeft);
        Pair<LogicalVariable, Mutable<ILogicalOperator>> createLeftAssignProjectOperatorResult =
                createAssignProjectOperator(op, intersectionMBR, intersectionMBRReplicateOperator,
                        exchMBRToJoinOpLeftRef, context);
        LogicalVariable leftIntersectionMBRVar = createLeftAssignProjectOperatorResult.getFirst();
        Mutable<ILogicalOperator> leftIntersectionMBRRef = createLeftAssignProjectOperatorResult.getSecond();

        // Replicate union MBR to the right branch
        ExchangeOperator exchMBRToJoinOpRight =
                createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
        MutableObject<ILogicalOperator> exchMBRToJoinOpRightRef = new MutableObject<>(exchMBRToJoinOpRight);
        Pair<LogicalVariable, Mutable<ILogicalOperator>> createRightAssignProjectOperatorResult =
                createAssignProjectOperator(op, intersectionMBR, intersectionMBRReplicateOperator,
                        exchMBRToJoinOpRightRef, context);
        LogicalVariable rightIntersectionMBRVar = createRightAssignProjectOperatorResult.getFirst();
        Mutable<ILogicalOperator> rightIntersectionMBRRef = createRightAssignProjectOperatorResult.getSecond();

        // Replicate union MBR to the right branch of a later Nested Loop Join reference point test
        ExchangeOperator exchMBRToReferencePointTestJoinOp =
                createBroadcastExchangeOp(intersectionMBRReplicateOperator, context, op.getSourceLocation());
        MutableObject<ILogicalOperator> exchMBRToReferencePointTestJoinOpRef =
                new MutableObject<>(exchMBRToReferencePointTestJoinOp);

        // Add left Join (TRUE)
        Mutable<ILogicalExpression> leftTrueCondition =
                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
        InnerJoinOperator leftJoinOp =
                new InnerJoinOperator(leftTrueCondition, leftExchToJoinOpRef, leftIntersectionMBRRef);
        leftJoinOp.setSourceLocation(op.getSourceLocation());
        leftJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
        MutableObject<ILogicalOperator> leftJoinRef = new MutableObject<>(leftJoinOp);
        leftJoinOp.recomputeSchema();
        context.computeAndSetTypeEnvironmentForOperator(leftJoinOp);
        leftInputOp.setValue(leftJoinRef.getValue());

        // Add right Join (TRUE)
        Mutable<ILogicalExpression> rightTrueCondition =
                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)));
        InnerJoinOperator rightJoinOp =
                new InnerJoinOperator(rightTrueCondition, rightExchToJoinOpRef, rightIntersectionMBRRef);
        rightJoinOp.setSourceLocation(op.getSourceLocation());
        rightJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(AbstractBinaryJoinOperator.JoinKind.INNER,
                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
        MutableObject<ILogicalOperator> rightJoinRef = new MutableObject<>(rightJoinOp);
        rightJoinOp.recomputeSchema();
        context.computeAndSetTypeEnvironmentForOperator(rightJoinOp);
        rightInputOp.setValue(rightJoinRef.getValue());

        Mutable<ILogicalExpression> leftIntersectionMBRExpr =
                new MutableObject<>(new VariableReferenceExpression(leftIntersectionMBRVar));
        Mutable<ILogicalExpression> rightIntersectionMBRExpr =
                new MutableObject<>(new VariableReferenceExpression(rightIntersectionMBRVar));
        Mutable<ILogicalExpression> referencePointTestMBRExpr =
                new MutableObject<>(new VariableReferenceExpression(intersectionMBR));

        // Inject unnest operator to add tile ID to the left and right branch of the join operator
        LogicalVariable leftTileIdVar = SpatialJoinUtils.injectSpatialTileUnnestOperator(context, leftInputOp,
                leftInputVar, leftIntersectionMBRExpr, DEFAULT_ROWS, DEFAULT_COLUMNS);
        LogicalVariable rightTileIdVar = SpatialJoinUtils.injectSpatialTileUnnestOperator(context, rightInputOp,
                rightInputVar, rightIntersectionMBRExpr, DEFAULT_ROWS, DEFAULT_COLUMNS);

        // Reference point test condition will be used as the condition of a Nested Loop Join operator after the
        // spatial join operator. This design allow us to use the union MBR (or summary of the join) efficiently,
        // instead of propagate this variable via Hyracks context or data flow.
        ScalarFunctionCallExpression referenceIdEquiJoinCondition =
                createReferencePointTestCondition(op, referencePointTestMBRExpr, leftTileIdVar, rightTileIdVar,
                        leftInputVar, rightInputVar, DEFAULT_ROWS, DEFAULT_COLUMNS);

        conditionExprs.add(new MutableObject<>(spatialJoinFuncExpr));

        ScalarFunctionCallExpression updatedJoinCondition;
        if (conditionExprs.size() > 1) {
            updatedJoinCondition = new ScalarFunctionCallExpression(
                    BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.AND), conditionExprs);
            updatedJoinCondition.setSourceLocation(op.getSourceLocation());
        } else {
            updatedJoinCondition = (ScalarFunctionCallExpression) spatialJoinFuncExpr;
        }
        Mutable<ILogicalExpression> joinConditionRef = op.getCondition();
        joinConditionRef.setValue(updatedJoinCondition);

        List<LogicalVariable> keysLeftBranch = new ArrayList<>();
        keysLeftBranch.add(leftTileIdVar);
        keysLeftBranch.add(leftInputVar);

        List<LogicalVariable> keysRightBranch = new ArrayList<>();
        keysRightBranch.add(rightTileIdVar);
        keysRightBranch.add(rightInputVar);

        InnerJoinOperator spatialJoinOp =
                new InnerJoinOperator(new MutableObject<>(updatedJoinCondition), leftInputOp, rightInputOp);
        spatialJoinOp.setSourceLocation(op.getSourceLocation());
        SpatialJoinUtils.setSpatialJoinOp(spatialJoinOp, keysLeftBranch, keysRightBranch, context);
        spatialJoinOp.setSchema(op.getSchema());
        context.computeAndSetTypeEnvironmentForOperator(spatialJoinOp);

        Mutable<ILogicalOperator> opRef = new MutableObject<>(op);
        Mutable<ILogicalOperator> spatialJoinOpRef = new MutableObject<>(spatialJoinOp);

        InnerJoinOperator referencePointTestJoinOp =
                new InnerJoinOperator(new MutableObject<>(referenceIdEquiJoinCondition), spatialJoinOpRef,
                        exchMBRToReferencePointTestJoinOpRef);
        referencePointTestJoinOp.setPhysicalOperator(new NestedLoopJoinPOperator(
                AbstractBinaryJoinOperator.JoinKind.INNER, AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
        MutableObject<ILogicalOperator> referencePointTestJoinOpRef = new MutableObject<>(referencePointTestJoinOp);
        referencePointTestJoinOp.setSourceLocation(op.getSourceLocation());
        context.computeAndSetTypeEnvironmentForOperator(referencePointTestJoinOp);
        referencePointTestJoinOp.recomputeSchema();
        opRef.setValue(referencePointTestJoinOpRef.getValue());
        op.getInputs().clear();
        op.getInputs().addAll(referencePointTestJoinOp.getInputs());
        op.setPhysicalOperator(referencePointTestJoinOp.getPhysicalOperator());
        op.getCondition().setValue(referencePointTestJoinOp.getCondition().getValue());
        context.computeAndSetTypeEnvironmentForOperator(op);
        op.recomputeSchema();
    }