in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [187:236]
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
CharSequence dataverse = SqlppStatementUtil.encloseDataverseName(new StringBuilder(), dataverseName);
builder.append("SET inline_with \"false\";\n");
if (!push) {
builder.append("insert into " + dataverse + "." + resultsTableName);
builder.append(" as a (\n");
}
builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
// builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + " as "
// + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_TYPE + ",");
// builder.append("b." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + " as " + BADConstants.METADATA_TYPE_FIELD_NAME_BROKER_END_POINT + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
builder.append(
MetadataConstants.METADATA_DATAVERSE_NAME + ".`" + BADConstants.METADATA_DATASET_BROKER + "` b, \n");
builder.append(function.getDataverseName() + "." + function.getName() + "(");
for (int iter1 = 0; iter1 < function.getArity(); iter1++) {
if (iter1 > 0) {
builder.append(", ");
}
builder.append("sub.param" + iter1);
}
builder.append(") result \n");
builder.append("where sub." + BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + " /*+ bcast */ = b."
+ BADConstants.METADATA_TYPE_FIELD_NAME_BROKERNAME + "\n");
builder.append("and sub." + BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + " /*+ bcast */ = b."
+ BADConstants.METADATA_TYPE_NAME_DATAVERSENAME + "\n");
if (!push) {
builder.append(")");
builder.append(" returning a");
}
builder.append(";");
body = builder.toString();
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(builder.toString()).parse();
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
if (push) {
return BADJobService.compilePushChannel(statementExecutor, metadataProvider, hcc,
(Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}