in asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java [124:155]
public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
//Add the Asterix Transaction Id to the map
jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
String.valueOf(txnIdFactory.create().getId()).getBytes());
long startTime = Instant.now().toEpochMilli();
JobId jobId = hcc.startJob(distributedId, jobParameters);
hcc.waitForCompletion(jobId);
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
ResultReader resultReader = new ResultReader(resultSet, jobId, new ResultSetId(0));
IResponsePrinter printer = statementExecutor.getResponsePrinter();
printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, new IStatementExecutor.Stats(),
statementExecutor.getSessionOutput()));
printer.printResults();
}
LOGGER.log(Level.SEVERE,
"Deployed Job execution completed for " + entityId.getExtensionName() + " "
+ entityId.getDataverseName() + "." + entityId.getEntityName() + ". Took "
+ executionMilliseconds + " milliseconds ");
return executionMilliseconds;
}