public MantisWorker()

in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/MantisWorker.java [72:187]


    public MantisWorker(ConfigurationFactory configFactory, Optional<Job> jobToRun) {
        // for rxjava
        System.setProperty("rx.ring-buffer.size", "1024");

        WorkerConfiguration config = configFactory.getConfig();
        final HighAvailabilityServices highAvailabilityServices =
            HighAvailabilityServicesUtil.createHAServices(config);
        mantisServices.add(new Service() {
            @Override
            public void start() {
                highAvailabilityServices.startAsync().awaitRunning();
            }

            @Override
            public void shutdown() {
                highAvailabilityServices.stopAsync().awaitTerminated();
            }

            @Override
            public void enterActiveMode() {

            }

            @Override
            public String toString() {
                return "HighAvailabilityServices Service";
            }
        });
        final MantisMasterGateway gateway =
            highAvailabilityServices.getMasterClientApi();
        // shutdown hook
        Thread t = new Thread() {
            @Override
            public void run() {
                shutdown();
            }
        };
        t.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(t);

        // services
        // metrics

        PublishSubject<WrappedExecuteStageRequest> executeStageSubject = PublishSubject.create();
        // TODO(sundaram): inline services are hard to read. Would be good to refactor this.
        mantisServices.add(new Service() {
            private RuntimeTaskImpl runtimeTaskImpl;
            private Subscription vmStatusSubscription;

            @Override
            public void start() {
                final ClassLoader classLoader;
                if (Thread.currentThread().getContextClassLoader() == null) {
                    classLoader = ClassLoader.getSystemClassLoader();
                    logger.info("Choosing system classloader {}", classLoader);
                } else {
                    classLoader = Thread.currentThread().getContextClassLoader();
                    logger.info("Choosing current thread classloader {}", classLoader);
                }

                executeStageSubject
                    .asObservable()
                    .first()
                    .subscribe(wrappedRequest -> {
                        try {
                            runtimeTaskImpl = new RuntimeTaskImpl();

                            // invoke internal runtimeTaskImpl initialize to inject the wrapped request.
                            runtimeTaskImpl.initialize(
                                wrappedRequest,
                                config,
                                gateway,
                                ClassLoaderHandle.fixed(classLoader).getOrResolveClassLoader(
                                    ImmutableList.of(), ImmutableList.of()),
                                SinkSubscriptionStateHandler
                                    .Factory
                                    .forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber(
                                        gateway,
                                        Clock.systemDefaultZone()));
                            runtimeTaskImpl.setJob(jobToRun);

                            runtimeTaskImpl.startAsync();
                        } catch (Exception ex) {
                            logger.error("Failed to start task, request: {}", wrappedRequest, ex);
                            throw new RuntimeException("Failed to start task", ex);
                        }
                    });
            }

            @Override
            public void shutdown() {
                if (runtimeTaskImpl != null) {
                    try {
                        runtimeTaskImpl.stopAsync().awaitTerminated();
                    } finally {
                        vmStatusSubscription.unsubscribe();
                    }
                }
            }

            @Override
            public void enterActiveMode() {

            }

            @Override
            public String toString() {
                return "TaskService";
            }
        });
        /* To run MantisWorker locally in IDE, use VirualMachineWorkerServiceLocalImpl instead
        WorkerTopologyInfo.Data workerData = new WorkerTopologyInfo.Data(data.getJobName(), data.getJobId(),
            data.getWorkerIndex(), data.getWorkerNumber(), data.getStageNumber(), data.getNumStages(), -1, -1, data.getMetricsPort());
        mantisServices.add(new VirtualMachineWorkerServiceLocalImpl(workerData, executeStageSubject, vmTaskStatusSubject));
        */
    }