public boolean rewrite()

in asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java [68:115]


    public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
        AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
        if (unnest.getPositionalVariable() != null) {
            throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
                    "No positional variables are allowed over feeds.");
        }
        DataverseName dataverseName =
                DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
        String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
        String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
        String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
        String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
        String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
        DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
        String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
        FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
        if (policy == null) {
            policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
            if (policy == null) {
                throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
                        "Unknown feed policy:" + policyName);
            }
        }
        ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
        String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
        List<LogicalVariable> pkVars = new ArrayList<>();
        FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
                metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
        // The order for feeds is <Record-Meta-PK>
        feedDataScanOutputVariables.add(unnest.getVariable());
        // Does it produce meta?
        if (ds.hasMeta()) {
            feedDataScanOutputVariables.add(context.newVar());
        }
        // Does it produce pk?
        if (ds.isChange()) {
            feedDataScanOutputVariables.addAll(pkVars);
        }
        DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
        scan.setSourceLocation(unnest.getSourceLocation());
        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
        scanInpList.addAll(unnest.getInputs());
        opRef.setValue(scan);
        context.computeAndSetTypeEnvironmentForOperator(scan);
        return true;
    }