in asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java [73:186]
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
boolean push = false;
AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
return false;
}
push = true;
}
DataSourceScanOperator subscriptionsScan;
DataverseName channelDataverse;
String channelName;
if (!push) {
DelegateOperator eOp = (DelegateOperator) op;
if (!(eOp.getDelegate() instanceof CommitOperator)) {
return false;
}
AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
return false;
}
InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
return false;
}
DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
String datasetName = dds.getDataset().getDatasetName();
if (!dds.getDataset().getItemTypeDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)
|| !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
|| !datasetName.endsWith("Results")) {
return false;
}
channelDataverse = dds.getDataset().getDataverseName();
//Now we know that we are inserting into results
channelName = datasetName.substring(0, datasetName.length() - 7);
String subscriptionsName = channelName + "Subscriptions";
subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
if (subscriptionsScan == null) {
return false;
}
} else {
//if push, get the channel name here instead
subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
if (subscriptionsScan == null) {
return false;
}
DatasetDataSource dds = (DatasetDataSource) subscriptionsScan.getDataSource();
String datasetName = dds.getDataset().getDatasetName();
channelDataverse = dds.getDataset().getDataverseName();
channelName = datasetName.substring(0, datasetName.length() - 13);
}
//Now we need to get the broker EndPoint
LogicalVariable brokerEndpointVar = context.newVar();
LogicalVariable brokerTypeVar = context.newVar();
AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
if (opAboveBrokersScan == null) {
return false;
}
//get subscriptionIdVar
LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
return false;
}
LogicalVariable channelExecutionVar = ((AssignOperator) channelExecutionAssign).getVariables().get(0);
if (!channelExecutionVar.toString().equals("$$" + BADConstants.ChannelExecutionTime)) {
return false;
}
if (!push) {
((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
}
AssignOperator assignOp =
createbrokerEndPointAssignOperator(brokerEndpointVar, brokerTypeVar, opAboveBrokersScan);
visitGroupBy(op, assignOp, brokerEndpointVar, brokerTypeVar);
context.computeAndSetTypeEnvironmentForOperator(assignOp);
context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
context.computeAndSetTypeEnvironmentForOperator(op);
ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
badProject.getVariables().add(subscriptionIdVar);
badProject.getVariables().add(brokerEndpointVar);
badProject.getVariables().add(channelExecutionVar);
badProject.getVariables().add(brokerTypeVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
brokerTypeVar, context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
: createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, brokerTypeVar,
context, op, (DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
return true;
}