public boolean rewritePost()

in asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java [73:186]


    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
            throws AlgebricksException {
        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
        if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
            return false;
        }
        boolean push = false;

        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
                return false;
            }
            push = true;
        }
        DataSourceScanOperator subscriptionsScan;
        DataverseName channelDataverse;
        String channelName;

        if (!push) {
            DelegateOperator eOp = (DelegateOperator) op;
            if (!(eOp.getDelegate() instanceof CommitOperator)) {
                return false;
            }
            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
            if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                return false;
            }
            InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
            if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
                return false;
            }
            DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
            String datasetName = dds.getDataset().getDatasetName();
            if (!dds.getDataset().getItemTypeDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
                    || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
                    || !datasetName.endsWith("Results")) {
                return false;
            }
            channelDataverse = dds.getDataset().getDataverseName();
            //Now we know that we are inserting into results

            channelName = datasetName.substring(0, datasetName.length() - 7);
            String subscriptionsName = channelName + "Subscriptions";
            subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
            if (subscriptionsScan == null) {
                return false;
            }

        } else {
            //if push, get the channel name here instead
            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
            if (subscriptionsScan == null) {
                return false;
            }
            DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource();
            String datasetName = dds.getDataset().getDatasetName();
            channelDataverse = dds.getDataset().getDataverseName();
            channelName = datasetName.substring(0, datasetName.length() - 13);
        }

        //Now we need to get the broker EndPoint
        LogicalVariable brokerEndpointVar = context.newVar();
        LogicalVariable brokerTypeVar = context.newVar();

        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
        if (opAboveBrokersScan == null) {
            return false;
        }

        //get subscriptionIdVar
        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);

        //The channelExecutionTime is created just before the scan
        ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
        if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
            return false;
        }
        LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0);
        if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) {
            return false;
        }

        if (!push) {
            ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
        }

        AssignOperator assignOp =
                createbrokerEndPointAssignOperator(brokerEndpointVar, brokerTypeVar, opAboveBrokersScan);

        visitGroupBy(op, assignOp, brokerEndpointVar, brokerTypeVar);

        context.computeAndSetTypeEnvironmentForOperator(assignOp);
        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
        context.computeAndSetTypeEnvironmentForOperator(op);

        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
        badProject.getVariables().add(subscriptionIdVar);
        badProject.getVariables().add(brokerEndpointVar);
        badProject.getVariables().add(channelExecutionVar);
        badProject.getVariables().add(brokerTypeVar);
        context.computeAndSetTypeEnvironmentForOperator(badProject);

        //Create my brokerNotify plan above the extension Operator
        DelegateOperator dOp = push
                ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
                        brokerTypeVar, context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, brokerTypeVar,
                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName);

        opRef.setValue(dOp);

        return true;
    }