in asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java [104:149]
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DataverseName dataverse = statementExecutor.getActiveDataverseName(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.RUNTIME_ENTITY_PROCEDURE, dataverse, procedureName);
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
Procedure procedure;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName, Integer.toString(getArity()));
if (procedure == null) {
throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
}
Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getResultSet(),
contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);
} else {
ScheduledExecutorService ses = BADJobService.createExecutorServe();
listener.setExecutorService(ses);
BADJobService.startRepetitiveDeployedJobSpec(ses, deployedJobSpecId, hcc,
BADJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
metadataProvider.getTxnIdFactory(), listener);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
txnActive = false;
} catch (Exception e) {
e.printStackTrace();
if (txnActive) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
throw HyracksDataException.create(e);
} finally {
metadataProvider.getLocks().unlock();
}
}