private FeedDataSource createFeedDataSource()

in asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java [117:161]


    private FeedDataSource createFeedDataSource(DataSourceId id, String targetDataset, String sourceFeedName,
            String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
            String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
            List<LogicalVariable> pkVars) throws AlgebricksException {
        Dataset dataset = metadataProvider.findDataset(id.getDataverseName(), targetDataset);
        ARecordType feedOutputType = (ARecordType) metadataProvider.findType(id.getDataverseName(), outputType);
        Feed sourceFeed = metadataProvider.findFeed(id.getDataverseName(), sourceFeedName);
        FeedConnection feedConnection =
                metadataProvider.findFeedConnection(id.getDataverseName(), sourceFeedName, targetDataset);
        // Is a change feed?
        List<IAType> pkTypes = null;
        List<List<String>> partitioningKeys = null;
        List<Integer> keySourceIndicator = null;

        List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
        if (ExternalDataUtils.isChangeFeed(sourceFeed.getConfiguration())) {
            List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
            keyAccessScalarFunctionCallExpression = new ArrayList<>();
            pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
            partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
            if (dataset.hasMetaPart()) {
                keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
            }
            for (int i = 0; i < partitioningKeys.size(); i++) {
                List<String> key = partitioningKeys.get(i);
                if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
                    PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
                            context, null);
                } else {
                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
                            null, context, null);
                }
            }
            keyAccessExpression.forEach(
                    expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
        } else {
            keyAccessScalarFunctionCallExpression = null;
        }
        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, id, targetDataset, feedOutputType, null, pkTypes,
                keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
                FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
                feedConnection);
        feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
        return feedDataSource;
    }