in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [124:185]
protected abstract void initialize(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
MetadataTransactionContext mdTxnCtx) throws Exception;
private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
Identifier subscriptionsTypeName = new Identifier(BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
List<List<String>> partitionFields = new ArrayList<>();
List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
TypeExpression subItemType = new TypeReferenceExpression(
new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName));
DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
subItemType, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc, null);
if (!push) {
//Setup the results dataset
partitionFields = new ArrayList<>();
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
TypeExpression resultItemType =
new TypeReferenceExpression(new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName));
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
resultItemType, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);
//Create an index on timestamp for results
CreateIndexStatement createTimeIndex = new CreateIndexStatement();
createTimeIndex.setDatasetName(new Identifier(resultsTableName));
createTimeIndex.setDataverseName(dataverseName);
createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex"));
createTimeIndex.setIfNotExists(false);
createTimeIndex.setIndexType(IndexType.BTREE);
createTimeIndex.setEnforced(false);
createTimeIndex.setGramLength(0);
List<String> fNames = new ArrayList<>();
fNames.add(BADConstants.ChannelExecutionTime);
Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
createTimeIndex.addFieldExprPair(fields);
createTimeIndex.addFieldIndexIndicator(0);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset,
hcc, null);
metadataProvider.getLocks().reset();
//Create a time index for the results
((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc,
null);
}
}