public void handle()

in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java [244:325]


    public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
            IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
            throws HyracksDataException, AlgebricksException {
        //This function performs three tasks:
        //1. Create datasets for the Channel
        //2. Create and run the Channel Job
        //3. Create the metadata entry for the channel

        //TODO: Figure out how to handle when a subset of the 3 tasks fails

        dataverseName = statementExecutor.getActiveDataverseName(dataverseName);
        subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
        resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;

        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverseName, channelName.getValue());
        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
        ActiveNotificationHandler activeEventHandler =
                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
        boolean alreadyActive = false;
        Channel channel;

        MetadataTransactionContext mdTxnCtx = null;
        try {
            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
            metadataProvider.setMetadataTxnContext(mdTxnCtx);
            channel = BADLangExtension.getChannel(mdTxnCtx, dataverseName, channelName.getValue());
            if (channel != null) {
                throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
            }
            if (listener != null) {
                alreadyActive = listener.isActive();
            }
            if (alreadyActive) {
                throw new AsterixException("Channel " + channelName + " is already running");
            }
            initialize(statementExecutor, metadataProvider, mdTxnCtx);

            //check if names are available before creating anything
            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, subscriptionsTableName) != null) {
                throw new AsterixException("The channel name:" + channelName + " is not available.");
            }
            if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, resultsTableName) != null) {
                throw new AsterixException("The channel name:" + channelName + " is not available.");
            }
            MetadataProvider tempMdProvider = BADUtils.replicateMetadataProvider(metadataProvider);
            tempMdProvider.setMaxResultReads(requestContext.getResultProperties().getMaxReads());
            final IResultSet resultSet = requestContext.getResultSet();
            final Stats stats = requestContext.getStats();
            tempMdProvider.getConfig().put(BADConstants.CONFIG_CHANNEL_NAME, channelName.getValue());
            //Create Channel Datasets
            createDatasets(statementExecutor, tempMdProvider, hcc);
            tempMdProvider.getLocks().reset();
            //Create Channel Internal Job
            JobSpecification channeljobSpec =
                    createChannelJob(statementExecutor, tempMdProvider, hcc, resultSet, stats);

            // Now we subscribe
            if (listener == null) {
                listener = new DeployedJobSpecEventListener(appCtx, entityId,
                        push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
                activeEventHandler.registerListener(listener);
            }

            BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
                    duration);
            channel = new Channel(dataverseName, channelName.getValue(), subscriptionsTableName, resultsTableName,
                    function, duration, null, body);

            MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
        } catch (Exception e) {
            if (mdTxnCtx != null) {
                QueryTranslator.abort(e, e, mdTxnCtx);
            }
            LOGGER.log(Level.WARNING, "Failed creating a channel", e);
            throw HyracksDataException.create(e);
        } finally {
            metadataProvider.getLocks().unlock();
        }

    }