in asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java [68:115]
public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
UnnestOperator unnest = (UnnestOperator) opRef.getValue();
if (unnest.getPositionalVariable() != null) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
"No positional variables are allowed over feeds.");
}
DataverseName dataverseName =
DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
if (policy == null) {
policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
if (policy == null) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
"Unknown feed policy:" + policyName);
}
}
ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
List<LogicalVariable> pkVars = new ArrayList<>();
FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
// The order for feeds is <Record-Meta-PK>
feedDataScanOutputVariables.add(unnest.getVariable());
// Does it produce meta?
if (ds.hasMeta()) {
feedDataScanOutputVariables.add(context.newVar());
}
// Does it produce pk?
if (ds.isChange()) {
feedDataScanOutputVariables.addAll(pkVars);
}
DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
scan.setSourceLocation(unnest.getSourceLocation());
List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
scanInpList.addAll(unnest.getInputs());
opRef.setValue(scan);
context.computeAndSetTypeEnvironmentForOperator(scan);
return true;
}