in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/ExecuteStageRequestService.java [70:167]
public void start() {
subscription = executeStageRequestObservable
// map to request with status observer
.map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() {
@Override
public TrackedExecuteStageRequest call(
WrappedExecuteStageRequest executeRequest) {
PublishSubject<Status> statusSubject = PublishSubject.create();
tasksStatusObserver.onNext(statusSubject);
return new TrackedExecuteStageRequest(executeRequest, statusSubject);
}
})
// get provider from jar, return tracked MantisJob
.flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() {
@SuppressWarnings("rawtypes") // raw type due to unknown type for mantis job
@Override
public Observable<ExecutionDetails> call(TrackedExecuteStageRequest executeRequest) {
ExecuteStageRequest executeStageRequest =
executeRequest.getExecuteRequest().getRequest();
Job mantisJob;
ClassLoader cl = null;
try {
if (!ExecuteStageRequestService.this.mantisJob.isPresent()) {
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
logger.info("Loading JAR files for task {}.", this);
cl = userCodeClassLoader.asClassLoader();
if (jobProviderClass.isPresent()) {
logger.info("loading job main class " + jobProviderClass.get());
final MantisJobProvider jobProvider = InstantiationUtil.instantiate(
jobProviderClass.get(), MantisJobProvider.class, cl);
mantisJob = jobProvider.getJobInstance();
} else {
logger.info("using serviceLoader to get job instance");
ServiceLoader<MantisJobProvider> provider = ServiceLoader.load(
MantisJobProvider.class, cl);
// should only be a single provider, check is made in master
MantisJobProvider mantisJobProvider = provider.iterator()
.next();
mantisJob = mantisJobProvider.getJobInstance();
}
} else {
cl = userCodeClassLoader.asClassLoader();
mantisJob = ExecuteStageRequestService.this.mantisJob.get();
}
} catch (Throwable e) {
logger.error("Failed to load job class", e);
executeRequest.getStatus().onError(e);
return Observable.empty();
}
logger.info("Executing job {}", mantisJob);
return Observable.just(new ExecutionDetails(executeRequest.getExecuteRequest(),
executeRequest.getStatus(), mantisJob, cl, executeStageRequest.getParameters()));
}
})
.subscribe(new Observer<ExecutionDetails>() {
@Override
public void onCompleted() {
logger.error("Execute stage observable completed"); // should never occur
try {
executionOperations.shutdownStage();
} catch (IOException e) {
logger.error("Failed to close stage cleanly", e);
}
}
@Override
public void onError(Throwable e) {
logger.error("Execute stage observable threw exception", e);
}
@Override
public void onNext(final ExecutionDetails executionDetails) {
logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
Thread t = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()) {
@Override
public void run() {
// Add ports here
try {
executionOperations.executeStage(executionDetails);
} catch (Throwable t) {
logger.error("Failed to execute job stage", t);
}
}
};
// rebuild class path, job jar + parent class loader
// job jar to reference third party libraries and resources
// parent to reference worker code
ClassLoader cl = executionDetails.getClassLoader();
t.setContextClassLoader(cl);
t.setDaemon(true);
t.start();
}
});
}