in asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java [269:299]
protected void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
StopFeedStatement sfst = (StopFeedStatement) stmt;
DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
String feedName = sfst.getFeedName().getValue();
// Retrieve Feed entity from Metadata
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
Map<String, String> feedConfig = feed.getConfiguration();
if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
String badHost = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_HOST, null);
String badChannelName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL, null);
String badDataverseName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV, null);
// construct statements
String dropStmtStr = String.format(
"USE %s;\n"
+ "DELETE FROM %sSubscriptions s WHERE s.BrokerName = \"%sBroker\"; DROP BROKER %sBroker",
badDataverseName, badChannelName, feed.getFeedName(), feed.getFeedName());
// make request
int responseCode = BADLangUtils.executeStatement(badHost, dropStmtStr);
if (responseCode != 200) {
throw new AlgebricksException("Connecting to " + badHost + " failed");
}
}
}