public void start()

in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java [70:167]


    public void start() {
        subscription = executeStageRequestObservable
                // map to request with status observer
                .map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() {
                    @Override
                    public TrackedExecuteStageRequest call(
                            WrappedExecuteStageRequest executeRequest) {
                        PublishSubject<Status> statusSubject = PublishSubject.create();
                        tasksStatusObserver.onNext(statusSubject);
                        return new TrackedExecuteStageRequest(executeRequest, statusSubject);
                    }
                })
                // get provider from jar, return tracked MantisJob
                .flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() {
                    @SuppressWarnings("rawtypes") // raw type due to unknown type for mantis job
                    @Override
                    public Observable<ExecutionDetails> call(TrackedExecuteStageRequest executeRequest) {

                        ExecuteStageRequest executeStageRequest =
                                executeRequest.getExecuteRequest().getRequest();

                        Job mantisJob;
                        ClassLoader cl = null;
                        try {
                            if (!ExecuteStageRequestService.this.mantisJob.isPresent()) {
                                // first of all, get a user-code classloader
                                // this may involve downloading the job's JAR files and/or classes
                                logger.info("Loading JAR files for task {}.", this);

                                cl = userCodeClassLoader.asClassLoader();
                                if (jobProviderClass.isPresent()) {
                                    logger.info("loading job main class " + jobProviderClass.get());
                                    final MantisJobProvider jobProvider = InstantiationUtil.instantiate(
                                        jobProviderClass.get(), MantisJobProvider.class, cl);
                                    mantisJob = jobProvider.getJobInstance();
                                } else {
                                    logger.info("using serviceLoader to get job instance");
                                    ServiceLoader<MantisJobProvider> provider = ServiceLoader.load(
                                        MantisJobProvider.class, cl);
                                    // should only be a single provider, check is made in master
                                    MantisJobProvider mantisJobProvider = provider.iterator()
                                        .next();
                                    mantisJob = mantisJobProvider.getJobInstance();
                                }
                            } else {
                                cl = userCodeClassLoader.asClassLoader();
                                mantisJob = ExecuteStageRequestService.this.mantisJob.get();
                            }
                        } catch (Throwable e) {
                            logger.error("Failed to load job class", e);
                            executeRequest.getStatus().onError(e);
                            return Observable.empty();
                        }
                        logger.info("Executing job {}", mantisJob);
                        return Observable.just(new ExecutionDetails(executeRequest.getExecuteRequest(),
                                executeRequest.getStatus(), mantisJob, cl, executeStageRequest.getParameters()));
                    }
                })
                .subscribe(new Observer<ExecutionDetails>() {
                    @Override
                    public void onCompleted() {
                        logger.error("Execute stage observable completed"); // should never occur
                        try {
                            executionOperations.shutdownStage();
                        } catch (IOException e) {
                            logger.error("Failed to close stage cleanly", e);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        logger.error("Execute stage observable threw exception", e);
                    }

                    @Override
                    public void onNext(final ExecutionDetails executionDetails) {
                        logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
                        Thread t = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()) {
                            @Override
                            public void run() {
                                // Add ports here
                                try {
                                    executionOperations.executeStage(executionDetails);
                                } catch (Throwable t) {
                                    logger.error("Failed to execute job stage", t);
                                }
                            }
                        };
                        // rebuild class path, job jar + parent class loader
                        // job jar to reference third party libraries and resources
                        // parent to reference worker code
                        ClassLoader cl = executionDetails.getClassLoader();
                        t.setContextClassLoader(cl);
                        t.setDaemon(true);
                        t.start();
                    }
                });
    }