public void executeStage()

in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java [267:501]


    public void executeStage(final ExecutionDetails setup) throws IOException {

        ExecuteStageRequest executionRequest = setup.getExecuteStageRequest().getRequest();

        jobStatusObserver = setup.getStatus();

        // Initialize the schedulingInfo observable for current job and mark it shareable to be reused by anyone interested in this data.
        //Observable<JobSchedulingInfo> selfSchedulingInfo = mantisMasterApi.schedulingChanges(executionRequest.getJobId()).switchMap((e) -> Observable.just(e).repeatWhen(x -> x.delay(5 , TimeUnit.SECONDS))).subscribeOn(Schedulers.io()).share();

        // JobSchedulingInfo has metadata around which stage runs on which set of workers
        Observable<JobSchedulingInfo> selfSchedulingInfo =
            mantisMasterApi.schedulingChanges(executionRequest.getJobId())
                .subscribeOn(Schedulers.io())
                .replay(1)
                .refCount()
                .doOnSubscribe(() -> logger.info("mantisApi schedulingChanges subscribe"))
                .doOnUnsubscribe(() -> logger.info("mantisApi schedulingChanges stream unsub."))
                .doOnError(e -> logger.warn("mantisApi schedulingChanges stream error:", e))
                .doOnCompleted(() -> logger.info("mantisApi schedulingChanges stream completed."));
        // represents datastructure that has the current worker information and what it represents in the overall operator DAG
        WorkerInfo workerInfo = generateWorkerInfo(executionRequest.getJobName(), executionRequest.getJobId(),
                executionRequest.getStage(), executionRequest.getWorkerIndex(),
                executionRequest.getWorkerNumber(), executionRequest.getDurationType(), "host", executionRequest.getWorkerPorts());

        // observable that represents the number of workers for the source stage
        final Observable<Integer> sourceStageTotalWorkersObs = createSourceStageTotalWorkersObservable(selfSchedulingInfo);
        RunningWorker.Builder rwBuilder = new RunningWorker.Builder()
                .job(setup.getMantisJob())
                .schedulingInfo(executionRequest.getSchedulingInfo())
                .stageTotalWorkersObservable(sourceStageTotalWorkersObs)
                .jobName(executionRequest.getJobName())
                .stageNum(executionRequest.getStage())
                .workerIndex(executionRequest.getWorkerIndex())
                .workerNum(executionRequest.getWorkerNumber())
                .totalStages(executionRequest.getTotalNumStages())
                .metricsPort(executionRequest.getMetricsPort())
                .ports(executionRequest.getPorts().iterator())
                .jobStatusObserver(setup.getStatus())
                .requestSubject(setup.getExecuteStageRequest().getRequestSubject())
                .workerInfo(workerInfo)
                .hasJobMaster(executionRequest.getHasJobMaster())
                .jobId(executionRequest.getJobId());

        if (executionRequest.getStage() == 0) {
            rwBuilder = rwBuilder.stage(new JobMasterStageConfig("jobmasterconfig"));
        } else {
            rwBuilder = rwBuilder.stage((StageConfig) setup.getMantisJob()
                    .getStages().get(executionRequest.getStage() - 1));
        }
        final RunningWorker rw = rwBuilder.build();

        if (rw.getStageNum() == rw.getTotalStagesNet()) {
            // set up subscription state handler only for sink (last) stage
            setupSubscriptionStateHandler(setup.getExecuteStageRequest().getRequest());
        }

        logger.info("Running worker info: " + rw);

        rw.signalStartedInitiated();

        try {

            logger.info(">>>>>>>>>>>>>>>>Calling lifecycle.startup()");
            Lifecycle lifecycle = rw.getJob().getLifecycle();
            lifecycle.startup();
            ServiceLocator serviceLocator = lifecycle.getServiceLocator();

            if (lookupSpectatorRegistry) {
                try {
                    final Registry spectatorRegistry = serviceLocator.service(Registry.class);
                    SpectatorRegistryFactory.setRegistry(spectatorRegistry);
                } catch (Throwable t) {
                    logger.error("failed to init spectator registry using service locator, falling back to {}",
                            SpectatorRegistryFactory.getRegistry().getClass().getCanonicalName());
                }
            }

            // Ensure netty clients' listeners are set. This is redundant to the settings in TaskExecutor to ensure
            // the integration at runtime level.
            MantisNettyEventsListenerFactory mantisNettyEventsListenerFactory = new MantisNettyEventsListenerFactory();
            RxNetty.useMetricListenersFactory(mantisNettyEventsListenerFactory);
            SseWorkerConnection.useMetricListenersFactory(mantisNettyEventsListenerFactory);

            // create job context
            Parameters parameters = ParameterUtils
                    .createContextParameters(rw.getJob().getParameterDefinitions(),
                            setup.getParameters());
            final Context context = generateContext(parameters, serviceLocator, workerInfo, MetricsRegistry.getInstance(),
                    () -> {
                        rw.signalCompleted();
                        // wait for completion signal to go to the master and us getting killed. Upon timeout, exit.
                        try {Thread.sleep(60000);} catch (InterruptedException ie) {
                            logger.warn("Unexpected exception sleeping: " + ie.getMessage());
                        }
                        System.exit(0);
                    }, createWorkerMapObservable(selfSchedulingInfo, executionRequest.getJobName(), executionRequest.getJobId(), executionRequest.getDurationType()),
                classLoader
            );
            //context.setPrevStageCompletedObservable(createPrevStageCompletedObservable(selfSchedulingInfo, rw.getJobId(), rw.getStageNum()));

            rw.setContext(context);
            // setup heartbeats
            heartbeatRef.set(new Heartbeat(rw.getJobId(),
                    rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), config.getTaskExecutorHostName()));
            Closeable heartbeatCloseable = startSendingHeartbeats(rw.getJobStatus(),
                executionRequest.getHeartbeatIntervalSecs());
            closeables.add(heartbeatCloseable);

            // execute stage
            if (rw.getStageNum() == 0) {
                logger.info("JobId: " + rw.getJobId() + ", executing Job Master");

                final AutoScaleMetricsConfig autoScaleMetricsConfig = new AutoScaleMetricsConfig();

                // Temporary workaround to enable auto-scaling by custom metric in Job Master. This will be revisited to get the entire autoscaling config
                // for a job as a System parameter in the JobMaster
                final String autoScaleMetricString = (String) parameters.get(JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM, "");
                if (!Strings.isNullOrEmpty(autoScaleMetricString)) {
                    final List<String> tokens = Splitter.on("::").omitEmptyStrings().trimResults().splitToList(autoScaleMetricString);
                    if (tokens.size() == 3) {
                        final String metricGroup = tokens.get(0);
                        final String metricName = tokens.get(1);
                        final String algo = tokens.get(2);
                        try {
                            final AutoScaleMetricsConfig.AggregationAlgo aggregationAlgo = AutoScaleMetricsConfig.AggregationAlgo.valueOf(algo);
                            logger.info("registered UserDefined auto scale metric {}:{} algo {}", metricGroup, metricName, aggregationAlgo);
                            autoScaleMetricsConfig.addUserDefinedMetric(metricGroup, metricName, aggregationAlgo);
                        } catch (IllegalArgumentException e) {
                            final String errorMsg = String.format("ERROR: Invalid algorithm value %s for param %s (algo should be one of %s)",
                                    autoScaleMetricsConfig, JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM,
                                    Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(a -> a.name()).collect(Collectors.toList()));
                            logger.error(errorMsg);
                            throw new RuntimeException(errorMsg);
                        }
                    } else {
                        final String errorMsg = String.format("ERROR: Invalid value %s for param %s", autoScaleMetricString, JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM);
                        logger.error(errorMsg);
                        throw new RuntimeException(errorMsg);
                    }
                } else {
                    logger.info("param {} is null or empty", JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM);
                }

                JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(serviceLocator);

                // switch to v2 scaler control only when parameter is set to true for now
                final Boolean useV2ScalerService = (Boolean) parameters.get(JOB_AUTOSCALE_V2_ENABLED_PARAM, true);
                if (useV2ScalerService) {
                    logger.info("[V2 AUTO-SCALER ENABLED] Using V2 JobAutoScalerService: JobMasterServiceV2");
                    // Build the JobScalerContext
                    JobScalerContext jobScalerContext = JobScalerContext.builder()
                        .jobId(rw.getJobId())
                        .schedInfo(rw.getSchedulingInfo())
                        .workerMetricsClient(workerMetricsClient)
                        .autoScaleMetricsConfig(autoScaleMetricsConfig)
                        .masterClientApi(mantisMasterApi)
                        .context(rw.getContext())
                        .observableOnCompleteCallback(rw.getOnCompleteCallback())
                        .observableOnErrorCallback(rw.getOnErrorCallback())
                        .observableOnTerminateCallback(rw.getOnTerminateCallback())
                        .jobAutoscalerManager(jobAutoscalerManager)
                        .build();

                    logger.info("Creating JobMasterServiceV2 using ComponentClassLoader");
                    final String jmLoaderConfigString = (String) parameters.get(JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM, "");
                    JobMasterComponentLoader jobMasterComponentLoader = JobMasterComponentLoader.fromAkkaRpc(jmLoaderConfigString);
                    Service jobMasterServiceV2 = jobMasterComponentLoader.createAndStartJobMasterServiceV2(jobScalerContext);
                    logger.info("Created JobMasterServiceV2 using ComponentClassLoader");
                    closeables.add(jobMasterServiceV2::shutdown);
                } else {
                    logger.info("Using V1 JobMasterService");
                    JobMasterService jobMasterService = new JobMasterService(rw.getJobId(), rw.getSchedulingInfo(),
                        workerMetricsClient, autoScaleMetricsConfig, mantisMasterApi, rw.getContext(), rw.getOnCompleteCallback(), rw.getOnErrorCallback(), rw.getOnTerminateCallback(), jobAutoscalerManager);
                    jobMasterService.start();
                    closeables.add(jobMasterService::shutdown);
                }

                signalStarted(rw);
                // block until worker terminates
                rw.waitUntilTerminate();
            } else if (rw.getStageNum() == 1 && rw.getTotalStagesNet() == 1) {
                logger.info("JobId: " + rw.getJobId() + ", single stage job, executing entire job");
                // single stage, execute entire job on this machine
                PortSelector portSelector = new PortSelector() {
                    @Override
                    public int acquirePort() {
                        return rw.getPorts().next();
                    }
                };
                RxMetrics rxMetrics = new RxMetrics();
                closeables.add(StageExecutors.executeSingleStageJob(rw.getJob().getSource(), rw.getStage(),
                        rw.getJob().getSink(), portSelector, rxMetrics, rw.getContext(),
                        rw.getOnTerminateCallback(), rw.getWorkerIndex(),
                        rw.getSourceStageTotalWorkersObservable(),
                        onSinkSubscribe, onSinkUnsubscribe,
                        rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
                signalStarted(rw);
                // block until worker terminates
                rw.waitUntilTerminate();
            } else {
                logger.info("JobId: " + rw.getJobId() + ", executing a multi-stage job, stage: " + rw.getStageNum());
                if (rw.getStageNum() == 1) {

                    // execute source stage
                    String remoteObservableName = rw.getJobId() + "_" + rw.getStageNum();

                    WorkerPublisherRemoteObservable publisher
                            = new WorkerPublisherRemoteObservable<>(rw.getPorts().next(),
                            remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1),
                            rw.getJobName(), getRouterFactoryInstance(serviceLocator));

                    closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(),
                            rw.getStage(), publisher, rw.getContext(), rw.getSourceStageTotalWorkersObservable()));

                    logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", serving remote observable for source with name: " + remoteObservableName);
                    RemoteRxServer server = publisher.getServer();
                    RxMetrics rxMetrics = server.getMetrics();
                    MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());

                    signalStarted(rw);
                    logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", blocking until source observable completes");
                    server.blockUntilServerShutdown();
                } else {
                    // execute intermediate stage or last stage plus sink
                    executeNonSourceStage(selfSchedulingInfo, rw, serviceLocator);
                }
            }
            logger.info("Calling lifecycle.shutdown()");
            lifecycle.shutdown();
        } catch (Throwable t) {
            logger.warn("Error during executing stage; shutting down.", t);
            rw.signalFailed(t);
            shutdownStage();
        }
    }