in asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java [213:266]
protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
StartFeedStatement sfs = (StartFeedStatement) stmt;
DataverseName dataverseName = getActiveDataverseName(sfs.getDataverseName());
String feedName = sfs.getFeedName().getValue();
// Retrieve Feed entity from Metadata
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
// If it's a BAD feed
Map<String, String> feedConfig = feed.getConfiguration();
if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
String badHost = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_HOST, null);
String badChannelName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL, null);
String badParameters = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_PARAMETERS, null);
String badDataverseName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV, null);
// construct statements
// add Broker to feed name as the broker name
try {
// create broker
String connStmtStr = String.format(
"USE %s;\n"
+ "DROP BROKER %BROKER IF EXISTS; CREATE BROKER %sBroker AT \"http://%s\" with {\"broker-type\" : \"BAD\"};\n",
badDataverseName, feed.getFeedName(), feed.getFeedName(),
feed.getConfiguration().get("addresses"));
BADLangUtils.executeStatement(badHost, connStmtStr);
// create subs
String[] params = badParameters.split(";");
StringBuilder subStmtStr = new StringBuilder(String.format("USE %s;\n ", badDataverseName));
for (String param : params) {
subStmtStr.append(String.format("SUBSCRIBE TO %s(%s) on %sBroker;", badChannelName, param,
feed.getFeedName()));
}
BADLangUtils.executeStatement(badHost, subStmtStr.toString());
} catch (Exception e) {
// drop broker and all subs if anything goes wrong
String dropStmtStr = String.format(
"USE %s;\n"
+ "DELETE FROM %sSubscriptions s WHERE s.BrokerName = \"%sBroker\"; DROP BROKER %sBroker",
badDataverseName, badChannelName, feed.getFeedName(), feed.getFeedName());
BADLangUtils.executeStatement(badHost, dropStmtStr);
throw e;
}
}
MetadataProvider badMetadataProvider = BADMetadataProvider.create(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
super.handleStartFeedStatement(badMetadataProvider, stmt, hcc);
}