in asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java [106:143]
private void createChannelTimeAssignOps(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> needPrevDsSet,
Set<LogicalVariable> needCurrDsSet, Map<LogicalVariable, LogicalVariable> prevMap,
Map<LogicalVariable, LogicalVariable> currMap, IOptimizationContext context, String channelName) {
ILogicalOperator currOp = opRef.getValue();
if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
DataSourceScanOperator dataScanOp = (DataSourceScanOperator) opRef.getValue();
DataSource ds = (DataSource) dataScanOp.getDataSource();
LogicalVariable dsVar = ds.getDataRecordVariable(dataScanOp.getScanVariables());
if (needPrevDsSet.contains(dsVar)) {
LogicalVariable channelTimeVar = context.newVar();
ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
AssignOperator assignOp =
new AssignOperator(channelTimeVar, new MutableObject<>(previousChannelTimeExpr));
assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
opRef.setValue(assignOp);
prevMap.put(dsVar, channelTimeVar);
}
if (needCurrDsSet.contains(dsVar)) {
LogicalVariable channelTimeVar = context.newVar();
ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME), new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
AssignOperator assignOp =
new AssignOperator(channelTimeVar, new MutableObject<>(previousChannelTimeExpr));
assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
opRef.setValue(assignOp);
currMap.put(dsVar, channelTimeVar);
}
}
for (Mutable<ILogicalOperator> input : currOp.getInputs()) {
createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, prevMap, currMap, context, channelName);
}
}