in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/MantisWorker.java [72:187]
public MantisWorker(ConfigurationFactory configFactory, Optional<Job> jobToRun) {
// for rxjava
System.setProperty("rx.ring-buffer.size", "1024");
WorkerConfiguration config = configFactory.getConfig();
final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtil.createHAServices(config);
mantisServices.add(new Service() {
@Override
public void start() {
highAvailabilityServices.startAsync().awaitRunning();
}
@Override
public void shutdown() {
highAvailabilityServices.stopAsync().awaitTerminated();
}
@Override
public void enterActiveMode() {
}
@Override
public String toString() {
return "HighAvailabilityServices Service";
}
});
final MantisMasterGateway gateway =
highAvailabilityServices.getMasterClientApi();
// shutdown hook
Thread t = new Thread() {
@Override
public void run() {
shutdown();
}
};
t.setDaemon(true);
Runtime.getRuntime().addShutdownHook(t);
// services
// metrics
PublishSubject<WrappedExecuteStageRequest> executeStageSubject = PublishSubject.create();
// TODO(sundaram): inline services are hard to read. Would be good to refactor this.
mantisServices.add(new Service() {
private RuntimeTaskImpl runtimeTaskImpl;
private Subscription vmStatusSubscription;
@Override
public void start() {
final ClassLoader classLoader;
if (Thread.currentThread().getContextClassLoader() == null) {
classLoader = ClassLoader.getSystemClassLoader();
logger.info("Choosing system classloader {}", classLoader);
} else {
classLoader = Thread.currentThread().getContextClassLoader();
logger.info("Choosing current thread classloader {}", classLoader);
}
executeStageSubject
.asObservable()
.first()
.subscribe(wrappedRequest -> {
try {
runtimeTaskImpl = new RuntimeTaskImpl();
// invoke internal runtimeTaskImpl initialize to inject the wrapped request.
runtimeTaskImpl.initialize(
wrappedRequest,
config,
gateway,
ClassLoaderHandle.fixed(classLoader).getOrResolveClassLoader(
ImmutableList.of(), ImmutableList.of()),
SinkSubscriptionStateHandler
.Factory
.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber(
gateway,
Clock.systemDefaultZone()));
runtimeTaskImpl.setJob(jobToRun);
runtimeTaskImpl.startAsync();
} catch (Exception ex) {
logger.error("Failed to start task, request: {}", wrappedRequest, ex);
throw new RuntimeException("Failed to start task", ex);
}
});
}
@Override
public void shutdown() {
if (runtimeTaskImpl != null) {
try {
runtimeTaskImpl.stopAsync().awaitTerminated();
} finally {
vmStatusSubscription.unsubscribe();
}
}
}
@Override
public void enterActiveMode() {
}
@Override
public String toString() {
return "TaskService";
}
});
/* To run MantisWorker locally in IDE, use VirualMachineWorkerServiceLocalImpl instead
WorkerTopologyInfo.Data workerData = new WorkerTopologyInfo.Data(data.getJobName(), data.getJobId(),
data.getWorkerIndex(), data.getWorkerNumber(), data.getStageNumber(), data.getNumStages(), -1, -1, data.getMetricsPort());
mantisServices.add(new VirtualMachineWorkerServiceLocalImpl(workerData, executeStageSubject, vmTaskStatusSubject));
*/
}