private JobSpecification createChannelJob()

in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [187:236]


    private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
            IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception {
        StringBuilder builder = new StringBuilder();
        CharSequence dataverse = SqlppStatementUtil.encloseDataverseName(new StringBuilder(), dataverseName);
        builder.append("SET inline_with \"false\";\n");
        if (!push) {
            builder.append("insert into " + dataverse + "." + resultsTableName);
            builder.append(" as a (\n");
        }
        builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
        builder.append("select result, ");
        builder.append(BADConstants.ChannelExecutionTime + ", ");
        builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
        // builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + " as "
        //         + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + ",");
        // builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + " as " + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + ",");
        builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
        builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
        builder.append(
                MetadataConstants.METADATA_DATAVERSE_NAME + ".`" + BADConstants.METADATA_DATASET_BROKER + "` b, \n");
        builder.append(function.getDataverseName() + "." + function.getName() + "(");
        for (int iter1 = 0; iter1 < function.getArity(); iter1++) {
            if (iter1 > 0) {
                builder.append(", ");
            }
            builder.append("sub.param" + iter1);
        }
        builder.append(") result \n");
        builder.append("where sub." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + " /*+ bcast */ = b."
                + BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + "\n");
        builder.append("and sub." + BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + " /*+ bcast */ = b."
                + BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + "\n");
        if (!push) {
            builder.append(")");
            builder.append(" returning a");
        }
        builder.append(";");
        body = builder.toString();
        BADParserFactory factory = new BADParserFactory();
        List<Statement> fStatements = factory.createParser(builder.toString()).parse();

        SetStatement ss = (SetStatement) fStatements.get(0);
        metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
        if (push) {
            return BADJobService.compilePushChannel(statementExecutor, metadataProvider, hcc,
                    (Query) fStatements.get(1));
        }
        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
                hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null);
    }