protected abstract void initialize()

in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [124:185]


    protected abstract void initialize(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
            MetadataTransactionContext mdTxnCtx) throws Exception;

    private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
            IHyracksClientConnection hcc) throws Exception {

        Identifier subscriptionsTypeName = new Identifier(BADConstants.METADATA_TYPENAME_SUBSCRIPTIONS);
        Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
        //Setup the subscriptions dataset
        List<List<String>> partitionFields = new ArrayList<>();
        List<Integer> keyIndicators = new ArrayList<>();
        keyIndicators.add(0);
        List<String> fieldNames = new ArrayList<>();
        fieldNames.add(BADConstants.SubscriptionId);
        partitionFields.add(fieldNames);
        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
        TypeExpression subItemType = new TypeReferenceExpression(
                new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName));
        DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
                subItemType, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);

        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
                hcc, null);

        if (!push) {
            //Setup the results dataset
            partitionFields = new ArrayList<>();
            fieldNames = new ArrayList<>();
            fieldNames.add(BADConstants.ResultId);
            partitionFields.add(fieldNames);
            idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
            TypeExpression resultItemType =
                    new TypeReferenceExpression(new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName));
            DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
                    resultItemType, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);

            //Create an index on timestamp for results
            CreateIndexStatement createTimeIndex = new CreateIndexStatement();
            createTimeIndex.setDatasetName(new Identifier(resultsTableName));
            createTimeIndex.setDataverseName(dataverseName);
            createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex"));
            createTimeIndex.setIfNotExists(false);
            createTimeIndex.setIndexType(IndexType.BTREE);
            createTimeIndex.setEnforced(false);
            createTimeIndex.setGramLength(0);
            List<String> fNames = new ArrayList<>();
            fNames.add(BADConstants.ChannelExecutionTime);
            Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
            createTimeIndex.addFieldExprPair(fields);
            createTimeIndex.addFieldIndexIndicator(0);
            metadataProvider.getLocks().reset();
            ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset,
                    hcc, null);
            metadataProvider.getLocks().reset();

            //Create a time index for the results
            ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc,
                    null);

        }

    }