in asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java [94:176]
private void deployJobs(ICcApplicationContext appCtx, List<Channel> channels, List<Procedure> procedures)
throws Exception {
SessionConfig sessionConfig =
new SessionConfig(SessionConfig.OutputFormat.ADM, true, true, true, SessionConfig.PlanFormat.STRING);
final SessionOutput sessionOutput = new SessionOutput(sessionConfig, null);
BADQueryTranslator badStatementExecutor =
new BADQueryTranslator(appCtx, new ArrayList<>(), sessionOutput, new BADCompilationProvider(),
Executors.newSingleThreadExecutor(
new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName())),
new ResponsePrinter(sessionOutput));
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
//Remove any lingering listeners
for (IActiveEntityEventsListener listener : activeEventHandler.getEventListeners()) {
if (listener instanceof DeployedJobSpecEventListener) {
if (((DeployedJobSpecEventListener) listener).getExecutorService() != null) {
((DeployedJobSpecEventListener) listener).getExecutorService().shutdown();
}
activeEventHandler.unregisterListener(listener);
}
}
MetadataProvider metadataProvider;
//Redeploy Jobs
for (Channel channel : channels) {
EntityId entityId = channel.getChannelId();
metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
DeployedJobSpecEventListener listener =
new DeployedJobSpecEventListener(appCtx, entityId, channel.getResultsDatasetName().equals("")
? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
listener.suspend();
activeEventHandler.registerListener(listener);
RequestReference requestReference =
RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
hcc,
new RequestParameters(requestReference, null, null, null, null, null, null, null, null, null, true),
true);
ScheduledExecutorService ses = BADJobService.createExecutorServe();
listener.setExecutorService(ses);
BADJobService.startRepetitiveDeployedJobSpec(ses, listener.getDeployedJobSpecId(), hcc,
BADJobService.findPeriod(channel.getDuration()), new HashMap<>(), entityId,
metadataProvider.getTxnIdFactory(), listener);
metadataProvider.getLocks().unlock();
LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
+ entityId.getEntityName() + " was stopped by cluster failure. It has restarted.");
}
for (Procedure procedure : procedures) {
EntityId entityId = procedure.getEntityId();
metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
metadataProvider.setWriterFactory(PrinterBasedWriterFactory.INSTANCE);
metadataProvider.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE);
DeployedJobSpecEventListener listener =
new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.valueOf(procedure.getType()));
listener.suspend();
activeEventHandler.registerListener(listener);
RequestReference requestReference =
RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc,
new RequestParameters(requestReference, null,
new ResultSet(hcc,
appCtx.getServiceContext().getControllerService().getNetworkSecurityManager()
.getSocketChannelFactory(),
appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
null, null, true),
true);
metadataProvider.getLocks().unlock();
//Log that the procedure stopped by cluster restart. Procedure is available again now.
LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
+ entityId.getEntityName()
+ " was lost with cluster failure and any repetitive instances have stopped. It is now available to run again.");
//TODO: allow repetitive procedures to restart execution automatically
//Issue: need to store in metadata the information for running instances
}
}