protected void handleStartFeedStatement()

in asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslator.java [213:266]


    protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc) throws Exception {
        StartFeedStatement sfs = (StartFeedStatement) stmt;
        DataverseName dataverseName = getActiveDataverseName(sfs.getDataverseName());
        String feedName = sfs.getFeedName().getValue();

        // Retrieve Feed entity from Metadata
        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
        metadataProvider.setMetadataTxnContext(mdTxnCtx);
        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
                metadataProvider.getMetadataTxnContext());
        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

        // If it's a BAD feed
        Map<String, String> feedConfig = feed.getConfiguration();
        if (feedConfig.containsKey(BADConstants.BAD_FEED_FIELD_NAME_HOST)) {
            String badHost = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_HOST, null);
            String badChannelName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL, null);
            String badParameters = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_PARAMETERS, null);
            String badDataverseName = feedConfig.getOrDefault(BADConstants.BAD_FEED_FIELD_NAME_CHANNEL_DV, null);

            // construct statements
            // add Broker to feed name as the broker name
            try {
                // create broker
                String connStmtStr = String.format(
                        "USE %s;\n"
                                + "DROP BROKER %BROKER IF EXISTS; CREATE BROKER %sBroker AT \"http://%s\" with {\"broker-type\" : \"BAD\"};\n",
                        badDataverseName, feed.getFeedName(), feed.getFeedName(),
                        feed.getConfiguration().get("addresses"));
                BADLangUtils.executeStatement(badHost, connStmtStr);

                // create subs
                String[] params = badParameters.split(";");
                StringBuilder subStmtStr = new StringBuilder(String.format("USE %s;\n ", badDataverseName));
                for (String param : params) {
                    subStmtStr.append(String.format("SUBSCRIBE TO %s(%s) on %sBroker;", badChannelName, param,
                            feed.getFeedName()));
                }
                BADLangUtils.executeStatement(badHost, subStmtStr.toString());
            } catch (Exception e) {
                // drop broker and all subs if anything goes wrong
                String dropStmtStr = String.format(
                        "USE %s;\n"
                                + "DELETE FROM %sSubscriptions s WHERE s.BrokerName = \"%sBroker\"; DROP BROKER %sBroker",
                        badDataverseName, badChannelName, feed.getFeedName(), feed.getFeedName());
                BADLangUtils.executeStatement(badHost, dropStmtStr);
                throw e;
            }
        }
        MetadataProvider badMetadataProvider = BADMetadataProvider.create(metadataProvider.getApplicationContext(),
                metadataProvider.getDefaultDataverse());
        super.handleStartFeedStatement(badMetadataProvider, stmt, hcc);
    }