private void deployJobs()

in asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java [94:176]


    private void deployJobs(ICcApplicationContext appCtx, List<Channel> channels, List<Procedure> procedures)
            throws Exception {
        SessionConfig sessionConfig =
                new SessionConfig(SessionConfig.OutputFormat.ADM, true, true, true, SessionConfig.PlanFormat.STRING);
        final SessionOutput sessionOutput = new SessionOutput(sessionConfig, null);
        BADQueryTranslator badStatementExecutor =
                new BADQueryTranslator(appCtx, new ArrayList<>(), sessionOutput, new BADCompilationProvider(),
                        Executors.newSingleThreadExecutor(
                                new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName())),
                        new ResponsePrinter(sessionOutput));

        ActiveNotificationHandler activeEventHandler =
                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();

        //Remove any lingering listeners
        for (IActiveEntityEventsListener listener : activeEventHandler.getEventListeners()) {
            if (listener instanceof DeployedJobSpecEventListener) {
                if (((DeployedJobSpecEventListener) listener).getExecutorService() != null) {
                    ((DeployedJobSpecEventListener) listener).getExecutorService().shutdown();
                }
                activeEventHandler.unregisterListener(listener);
            }
        }

        MetadataProvider metadataProvider;

        //Redeploy Jobs
        for (Channel channel : channels) {
            EntityId entityId = channel.getChannelId();
            metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
            DeployedJobSpecEventListener listener =
                    new DeployedJobSpecEventListener(appCtx, entityId, channel.getResultsDatasetName().equals("")
                            ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
            listener.suspend();
            activeEventHandler.registerListener(listener);
            RequestReference requestReference =
                    RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
            BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
                    hcc,
                    new RequestParameters(requestReference, null, null, null, null, null, null, null, null, null, true),
                    true);

            ScheduledExecutorService ses = BADJobService.createExecutorServe();
            listener.setExecutorService(ses);
            BADJobService.startRepetitiveDeployedJobSpec(ses, listener.getDeployedJobSpecId(), hcc,
                    BADJobService.findPeriod(channel.getDuration()), new HashMap<>(), entityId,
                    metadataProvider.getTxnIdFactory(), listener);
            metadataProvider.getLocks().unlock();

            LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
                    + entityId.getEntityName() + " was stopped by cluster failure. It has restarted.");

        }
        for (Procedure procedure : procedures) {
            EntityId entityId = procedure.getEntityId();
            metadataProvider = MetadataProvider.create(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE);
            metadataProvider.setWriterFactory(PrinterBasedWriterFactory.INSTANCE);
            metadataProvider.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE);
            DeployedJobSpecEventListener listener =
                    new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.valueOf(procedure.getType()));
            listener.suspend();
            activeEventHandler.registerListener(listener);
            RequestReference requestReference =
                    RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
            BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc,
                    new RequestParameters(requestReference, null,
                            new ResultSet(hcc,
                                    appCtx.getServiceContext().getControllerService().getNetworkSecurityManager()
                                            .getSocketChannelFactory(),
                                    appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS),
                            new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
                            new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
                            null, null, true),
                    true);
            metadataProvider.getLocks().unlock();
            //Log that the procedure stopped by cluster restart. Procedure is available again now.
            LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverseName() + "."
                    + entityId.getEntityName()
                    + " was lost with cluster failure and any repetitive instances have stopped. It is now available to run again.");
            //TODO: allow repetitive procedures to restart execution automatically
            //Issue: need to store in metadata the information for running instances
        }
    }