in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java [231:291]
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
initialize();
DataverseName dataverse = statementExecutor.getActiveDataverseName(signature.getDataverseName());
EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverse, signature.getName());
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
Integer.toString(signature.getArity()));
if (procedure != null) {
throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
}
if (listener != null) {
alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Procedure " + signature.getName() + " is already running");
}
metadataProvider.setResultSetId(new ResultSetId(0));
final Stats stats = requestParameters.getStats();
metadataProvider.setResultAsyncMode(false);
metadataProvider.setMaxResultReads(1);
//Create Procedure Internal Job
Pair<JobSpecification, PrecompiledType> procedureJobSpec =
createProcedureJob(statementExecutor, metadataProvider, hcc, stats);
// Now we subscribe
if (listener == null) {
listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second);
activeEventHandler.registerListener(listener);
}
setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, metadataProvider.getResultSetId(),
stats);
procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
procedureJobSpec.second.toString(), getProcedureBody(), SqlppParserFactory.SQLPP, duration,
dependencies);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (mdTxnCtx != null) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
LOGGER.log(Level.WARNING, "Failed creating a procedure", e);
throw HyracksDataException.create(e);
} finally {
metadataProvider.getLocks().unlock();
}
}