public void handle()

in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java [90:161]


    public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
            IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
            throws HyracksDataException, AlgebricksException {
        DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
        boolean txnActive = false;
        EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverse, channelName.getValue());
        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
        ActiveNotificationHandler activeEventHandler =
                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
        Channel channel = null;

        MetadataTransactionContext mdTxnCtx = null;
        try {
            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
            txnActive = true;
            channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
            txnActive = false;
            if (channel == null) {
                if (ifExists) {
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                    return;
                } else {
                    throw new AlgebricksException("There is no channel with this name " + channelName + ".");
                }
            }

            if (listener == null) {
                //TODO: Channels need to better handle cluster failures
                LOGGER.log(Level.SEVERE,
                        "Tried to drop a Deployed Job  whose listener no longer exists:  " + entityId.getExtensionName()
                                + " " + entityId.getDataverseName() + "." + entityId.getEntityName() + ".");

            } else {
                listener.getExecutorService().shutdown();
                if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) {
                    LOGGER.log(Level.SEVERE,
                            "Executor Service is terminating non-gracefully for: " + entityId.getExtensionName() + " "
                                    + entityId.getDataverseName() + "." + entityId.getEntityName());
                }
                DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
                listener.deActivate();
                activeEventHandler.unregisterListener(listener);
                if (deployedJobSpecId != null) {
                    hcc.undeployJobSpec(deployedJobSpecId);
                }
            }
            //Create a metadata provider to use in nested jobs.
            MetadataProvider tempMdProvider = MetadataProvider.create(appCtx, metadataProvider.getDefaultDataverse());
            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
            //Drop the Channel Datasets
            //TODO: Need to find some way to handle if this fails.

            //Remove the Channel Metadata
            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
            DropDatasetStatement dropStmt =
                    new DropDatasetStatement(dataverse, new Identifier(channel.getResultsDatasetName()), true);
            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
            tempMdProvider.getLocks().reset();
            dropStmt = new DropDatasetStatement(dataverse, new Identifier(channel.getSubscriptionsDataset()), true);
            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
        } catch (Exception e) {
            e.printStackTrace();
            if (txnActive) {
                QueryTranslator.abort(e, e, mdTxnCtx);
            }
            throw HyracksDataException.create(e);
        } finally {
            metadataProvider.getLocks().unlock();
        }
    }