in asterix-bad/src/main/java/org/apache/asterix/bad/function/rewriter/BADFeedRewriter.java [117:161]
private FeedDataSource createFeedDataSource(DataSourceId id, String targetDataset, String sourceFeedName,
String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
List<LogicalVariable> pkVars) throws AlgebricksException {
Dataset dataset = metadataProvider.findDataset(id.getDataverseName(), targetDataset);
ARecordType feedOutputType = (ARecordType) metadataProvider.findType(id.getDataverseName(), outputType);
Feed sourceFeed = metadataProvider.findFeed(id.getDataverseName(), sourceFeedName);
FeedConnection feedConnection =
metadataProvider.findFeedConnection(id.getDataverseName(), sourceFeedName, targetDataset);
// Is a change feed?
List<IAType> pkTypes = null;
List<List<String>> partitioningKeys = null;
List<Integer> keySourceIndicator = null;
List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
if (ExternalDataUtils.isChangeFeed(sourceFeed.getConfiguration())) {
List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
keyAccessScalarFunctionCallExpression = new ArrayList<>();
pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
if (dataset.hasMetaPart()) {
keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
}
for (int i = 0; i < partitioningKeys.size(); i++) {
List<String> key = partitioningKeys.get(i);
if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
context, null);
} else {
PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
null, context, null);
}
}
keyAccessExpression.forEach(
expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
} else {
keyAccessScalarFunctionCallExpression = null;
}
FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, id, targetDataset, feedOutputType, null, pkTypes,
keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
feedConnection);
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
return feedDataSource;
}