private void executeNonSourceStage()

in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java [558:634]


    private void executeNonSourceStage(Observable<JobSchedulingInfo> selfSchedulingInfo, final RunningWorker rw, ServiceLocator serviceLocator) {
        {
            // execute either intermediate (middle) stage or last+sink
            StageConfig previousStageExecuting = (StageConfig) rw.getJob().getStages()
                    .get(rw.getStageNum() - 2); // note, stages are zero indexed

            StageSchedulingInfo previousSchedulingInfo = rw.getSchedulingInfo().forStage(rw.getStageNum() - 1);
            int numInstanceAtPreviousStage = previousSchedulingInfo.getNumberOfInstances();
            AtomicBoolean acceptSchedulingChanges = new AtomicBoolean(true);
            WorkerConsumer consumer = connectToObservableAtPreviousStages(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() - 1,
                    numInstanceAtPreviousStage, previousStageExecuting, acceptSchedulingChanges,
                    rw.getJobStatus(), rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), serviceLocator);
            final int workerPort = rw.getPorts().next();

            if (rw.getStageNum() == rw.getTotalStagesNet()) {
                // last+sink stage
                logger.info(
                    "JobId: {}, executing sink stage: {}, signaling started", rw.getJobId(), rw.getStageNum());
                rw.getJobStatus().onNext(new Status(rw.getJobId(), rw.getStageNum(), rw.getWorkerIndex(),
                        rw.getWorkerNum(),
                        TYPE.INFO, String.format(STATUS_MESSAGE_FORMAT, rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), "running"),
                        MantisJobState.Started));

                PortSelector portSelector = new PortSelector() {
                    @Override
                    public int acquirePort() {
                        return workerPort;
                    }
                };
                RxMetrics rxMetrics = new RxMetrics();
                MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
                final CountDownLatch blockUntilComplete = new CountDownLatch(1);
                Action0 countDownLatch = new Action0() {
                    @Override
                    public void call() {
                        blockUntilComplete.countDown();
                    }
                };
                closeables.add(StageExecutors.executeSink(consumer, rw.getStage(),
                        rw.getJob().getSink(), portSelector, rxMetrics,
                        rw.getContext(), countDownLatch, onSinkSubscribe, onSinkUnsubscribe,
                        rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
                // block until completes
                try {
                    blockUntilComplete.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                acceptSchedulingChanges.set(false);
            } else {
                // intermediate stage
                logger.info("JobId: " + rw.getJobId() + ", executing intermediate stage: " + rw.getStageNum());

                int stageNumToExecute = rw.getStageNum();
                String jobId = rw.getJobId();
                String remoteObservableName = jobId + "_" + stageNumToExecute;

                WorkerPublisherRemoteObservable publisher
                        = new WorkerPublisherRemoteObservable<>(workerPort, remoteObservableName,
                        numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName(),
                        getRouterFactoryInstance(serviceLocator)
                    );
                closeables.add(StageExecutors.executeIntermediate(consumer, rw.getStage(), publisher,
                        rw.getContext()));
                RemoteRxServer server = publisher.getServer();

                logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", serving intermediate remote observable with name: " + remoteObservableName);
                RxMetrics rxMetrics = server.getMetrics();
                MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
                // send running signal only after server is started
                signalStarted(rw);
                logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", blocking until intermediate observable completes");
                server.blockUntilServerShutdown();
                acceptSchedulingChanges.set(false);
            }
        }
    }