public static void execute()

in mantis-runtime/src/main/java/io/mantisrx/runtime/executor/LocalJobExecutorNetworked.java [162:385]


    public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter... parameters) throws IllegalMantisJobException {
        // validate job
        try {
            new ValidateJob(job).execute();
        } catch (CommandException e) {
            throw new IllegalMantisJobException(e);
        }

        // execute job
        List<StageConfig> stages = job.getStages();
        final SourceHolder source = job.getSource();
        final SinkHolder sink = job.getSink();
        final PortSelector portSelector = new PortSelectorInRange(8000, 9000);

        // register netty metrics
        RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
        // start our metrics server
        MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1, Collections.EMPTY_MAP);
        metricsServer.start();

        Lifecycle lifecycle = job.getLifecycle();
        lifecycle.startup();

        // create job context
        Map parameterDefinitions = job.getParameterDefinitions();
        final String user = Optional.ofNullable(MantisProperties.getProperty("USER")).orElse(
            "userUnknown");
        String jobId = String.format("localJob-%s-%d", user, (int) (Math.random() * 10000));
        logger.info("jobID {}", jobId);
        final ServiceLocator serviceLocator = lifecycle.getServiceLocator();


        int numInstances = schedulingInfo.forStage(1).getNumberOfInstances();
        BehaviorSubject<Integer> workersInStageOneObservable = BehaviorSubject.create(numInstances);
        BehaviorSubject<WorkerMap> workerMapObservable = BehaviorSubject.create();

        if (stages.size() == 1) {
            // single stage job
            final StageConfig stage = stages.get(0);


            // use latch to wait for all instances to complete
            final CountDownLatch waitUntilAllCompleted = new CountDownLatch(numInstances);
            Action0 countDownLatchOnComplete = new Action0() {
                @Override
                public void call() {
                    waitUntilAllCompleted.countDown();
                }
            };
            Action0 nullOnCompleted = new Action0() {
                @Override
                public void call() {}
            };
            Action1<Throwable> nullOnError = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {}
            };

            Map<Integer, List<WorkerInfo>> workerInfoMap = new HashMap<>();
            List<WorkerInfo> workerInfoList = new ArrayList<>();


            // run for num of instances
            for (int i = 0; i < numInstances; i++) {

                WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, 1, i, i + 1, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                workerInfoList.add(workerInfo);
                Context context = new Context(
                        ParameterUtils.createContextParameters(parameterDefinitions,
                                parameters),
                        lifecycle.getServiceLocator(),
                        //new WorkerInfo(jobId, jobId, 1, i, i, MantisJobDurationType.Perpetual, "localhost", new ArrayList<>(),-1,-1),
                        workerInfo,
                        MetricsRegistry.getInstance(), () -> {
                    System.exit(0);
                }, workerMapObservable,
                    Thread.currentThread().getContextClassLoader());

                // workers for stage 1
                workerInfoMap.put(1, workerInfoList);

                workerMapObservable.onNext(new WorkerMap(workerInfoMap));


                StageExecutors.executeSingleStageJob(source, stage, sink, () -> workerInfo.getWorkerPorts().getSinkPort(), new RxMetrics(),
                        context, countDownLatchOnComplete, i, workersInStageOneObservable, null, null, nullOnCompleted,
                        nullOnError);
            }


            // wait for all instances to complete
            try {
                waitUntilAllCompleted.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } else {
            // multi-stage job
            int workerNumber = 0;
            // start source stages
            StageConfig currentStage = stages.get(0);
            StageConfig previousStage = null;
            StageSchedulingInfo currentStageScalingInfo = schedulingInfo.forStage(1);
            StageSchedulingInfo nextStageScalingInfo = schedulingInfo.forStage(2);
            int[] previousPorts = new int[currentStageScalingInfo.getNumberOfInstances()]; // num ports

            Map<Integer, List<WorkerInfo>> workerInfoMap = new HashMap<>();
            List<WorkerInfo> workerInfoList = new ArrayList<>();

            for (int i = 0; i < currentStageScalingInfo.getNumberOfInstances(); i++) {
                WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, 1, i, i + 1, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                workerInfoList.add(workerInfo);

                //int sourcePort = portSelector.acquirePort();
                int sourcePort = workerInfo.getWorkerPorts().getSinkPort();
                previousPorts[i] = sourcePort;
                Context context = new Context(
                        ParameterUtils.createContextParameters(parameterDefinitions,
                                parameters),
                        serviceLocator, workerInfo,
                        MetricsRegistry.getInstance(), nullAction, workerMapObservable,
                    Thread.currentThread().getContextClassLoader());

                startSource(i, sourcePort, nextStageScalingInfo.getNumberOfInstances(),
                        job.getSource(), currentStage, context, workersInStageOneObservable);
            }
            // workers for stage 1
            workerInfoMap.put(1, workerInfoList);

            workerMapObservable.onNext(new WorkerMap(workerInfoMap));


            // start intermediate stages, all but last stage
            for (int i = 1; i < stages.size() - 1; i++) {
                previousStage = currentStage;
                StageSchedulingInfo previousStageScalingInfo = schedulingInfo.forStage(i);
                currentStageScalingInfo = schedulingInfo.forStage(i + 1); // stages indexed starting at 1
                currentStage = stages.get(i);
                nextStageScalingInfo = schedulingInfo.forStage(i + 2); // stages indexed starting at 1
                int[] currentPorts = new int[currentStageScalingInfo.getNumberOfInstances()];
                workerInfoList = new ArrayList<>();
                for (int j = 0; j < currentStageScalingInfo.getNumberOfInstances(); j++) {

                    WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                    WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, i + 1, j, workerNumber++, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                    workerInfoList.add(workerInfo);
                    //int port = portSelector.acquirePort();
                    int port = workerInfo.getWorkerPorts().getSinkPort();
                    currentPorts[j] = port;

                    Context context = new Context(
                            ParameterUtils.createContextParameters(parameterDefinitions,
                                    parameters),
                            serviceLocator, workerInfo,
                            MetricsRegistry.getInstance(), nullAction, workerMapObservable,
                        Thread.currentThread().getContextClassLoader());


                    startIntermediate(previousPorts, port, currentStage, context, j,
                            nextStageScalingInfo.getNumberOfInstances(), i, previousStageScalingInfo.getNumberOfInstances());
                }
                // workers for current stage
                workerInfoMap.put(i + 1, workerInfoList);
                workerMapObservable.onNext(new WorkerMap(workerInfoMap));

                previousPorts = currentPorts;
            }

            // start sink stage
            StageSchedulingInfo previousStageScalingInfo = schedulingInfo.forStage(stages.size() - 1);
            previousStage = stages.get(stages.size() - 2);
            currentStage = stages.get(stages.size() - 1);
            currentStageScalingInfo = schedulingInfo.forStage(stages.size());
            numInstances = currentStageScalingInfo.getNumberOfInstances();
            // use latch to wait for all instances to complete

            final CountDownLatch waitUntilAllCompleted = new CountDownLatch(numInstances);
            Action0 countDownLatchOnTerminated = new Action0() {
                @Override
                public void call() {
                    waitUntilAllCompleted.countDown();
                }
            };
            Action0 nullOnCompleted = new Action0() {
                @Override
                public void call() {}
            };
            Action1<Throwable> nullOnError = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {}
            };
            workerInfoList = new ArrayList<>();
            for (int i = 0; i < numInstances; i++) {
                WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, stages.size(), i, workerNumber++, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                workerInfoList.add(workerInfo);

                Context context = new Context(
                        ParameterUtils.createContextParameters(parameterDefinitions,
                                parameters),
                        serviceLocator, workerInfo,
                        MetricsRegistry.getInstance(), nullAction, workerMapObservable,
                    Thread.currentThread().getContextClassLoader());


                startSink(previousStage, previousPorts, currentStage, () -> workerInfo.getWorkerPorts().getSinkPort(), sink,
                        context, countDownLatchOnTerminated,
                        nullOnCompleted, nullOnError, stages.size(), i, previousStageScalingInfo.getNumberOfInstances());
            }
            workerInfoMap.put(stages.size(), workerInfoList);
            workerMapObservable.onNext(new WorkerMap(workerInfoMap));
            // wait for all instances to complete
            try {
                waitUntilAllCompleted.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        lifecycle.shutdown();
        metricsServer.shutdown();
    }