in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java [90:161]
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_CHANNEL, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
txnActive = false;
if (channel == null) {
if (ifExists) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return;
} else {
throw new AlgebricksException("There is no channel with this name " + channelName + ".");
}
}
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
+ " " + entityId.getDataverseName() + "." + entityId.getEntityName() + ".");
} else {
listener.getExecutorService().shutdown();
if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName() + " "
+ entityId.getDataverseName() + "." + entityId.getEntityName());
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
if (deployedJobSpecId != null) {
hcc.undeployJobSpec(deployedJobSpecId);
}
}
//Create a metadata provider to use in nested jobs.
MetadataProvider tempMdProvider = MetadataProvider.create(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
DropDatasetStatement dropStmt =
new DropDatasetStatement(dataverse, new Identifier(channel.getResultsDatasetName()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
tempMdProvider.getLocks().reset();
dropStmt = new DropDatasetStatement(dataverse, new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
e.printStackTrace();
if (txnActive) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw HyracksDataException.create(e);
} finally {
metadataProvider.getLocks().unlock();
}
}