in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java [83:155]
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
FunctionSignature signature = getFunctionSignature();
DataverseName dataverseName = statementExecutor.getActiveDataverseName(signature.getDataverseName());
signature.setDataverseName(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverseName, signature.getName());
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
if (listener.isActive()) {
throw new AlgebricksException("Cannot drop running procedure. There are " + listener.getRunningInstance()
+ " running instances.");
}
Procedure procedure;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverseName, signature.getName(),
Integer.toString(signature.getArity()));
txnActive = false;
if (procedure == null) {
if (ifExists) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return;
} else {
throw new AlgebricksException("There is no procedure with this name " + signature.getName() + ".");
}
}
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
+ " " + entityId.getDataverseName() + "." + entityId.getEntityName() + ".");
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT,
TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName()
+ " " + entityId.getDataverseName() + "." + entityId.getEntityName());
}
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
if (deployedJobSpecId != null) {
hcc.undeployJobSpec(deployedJobSpecId);
}
}
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, procedure);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
e.printStackTrace();
if (txnActive) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw HyracksDataException.create(e);
} finally {
metadataProvider.getLocks().unlock();
}
}