in asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java [208:258]
public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
BADQueryTranslator badStatementExecutor, IHyracksClientConnection hcc, IRequestParameters requestParameters,
boolean useNewId) throws Exception {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
if (listener == null) {
LOGGER.severe("Tried to redeploy the job for " + entityId + " but no listener exists.");
return;
}
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(queryBodyString).parse();
JobSpecification jobSpec = null;
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
|| listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
//Channels
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)) {
jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
} else {
jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
null, null, null, null, true, null, null, null);
}
} else {
//Procedures
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
IResultSet hdc = requestParameters.getResultSet();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
metadataProvider.setMaxResultReads(1);
jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));
}
if (useNewId) {
DeployedJobSpecId id = hcc.deployJobSpec(jobSpec);
listener.setDeployedJobSpecId(id);
} else {
hcc.redeployJobSpec(listener.getDeployedJobSpecId(), jobSpec);
}
listener.resume();
}