in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java [267:501]
public void executeStage(final ExecutionDetails setup) throws IOException {
ExecuteStageRequest executionRequest = setup.getExecuteStageRequest().getRequest();
jobStatusObserver = setup.getStatus();
// Initialize the schedulingInfo observable for current job and mark it shareable to be reused by anyone interested in this data.
//Observable<JobSchedulingInfo> selfSchedulingInfo = mantisMasterApi.schedulingChanges(executionRequest.getJobId()).switchMap((e) -> Observable.just(e).repeatWhen(x -> x.delay(5 , TimeUnit.SECONDS))).subscribeOn(Schedulers.io()).share();
// JobSchedulingInfo has metadata around which stage runs on which set of workers
Observable<JobSchedulingInfo> selfSchedulingInfo =
mantisMasterApi.schedulingChanges(executionRequest.getJobId())
.subscribeOn(Schedulers.io())
.replay(1)
.refCount()
.doOnSubscribe(() -> logger.info("mantisApi schedulingChanges subscribe"))
.doOnUnsubscribe(() -> logger.info("mantisApi schedulingChanges stream unsub."))
.doOnError(e -> logger.warn("mantisApi schedulingChanges stream error:", e))
.doOnCompleted(() -> logger.info("mantisApi schedulingChanges stream completed."));
// represents datastructure that has the current worker information and what it represents in the overall operator DAG
WorkerInfo workerInfo = generateWorkerInfo(executionRequest.getJobName(), executionRequest.getJobId(),
executionRequest.getStage(), executionRequest.getWorkerIndex(),
executionRequest.getWorkerNumber(), executionRequest.getDurationType(), "host", executionRequest.getWorkerPorts());
// observable that represents the number of workers for the source stage
final Observable<Integer> sourceStageTotalWorkersObs = createSourceStageTotalWorkersObservable(selfSchedulingInfo);
RunningWorker.Builder rwBuilder = new RunningWorker.Builder()
.job(setup.getMantisJob())
.schedulingInfo(executionRequest.getSchedulingInfo())
.stageTotalWorkersObservable(sourceStageTotalWorkersObs)
.jobName(executionRequest.getJobName())
.stageNum(executionRequest.getStage())
.workerIndex(executionRequest.getWorkerIndex())
.workerNum(executionRequest.getWorkerNumber())
.totalStages(executionRequest.getTotalNumStages())
.metricsPort(executionRequest.getMetricsPort())
.ports(executionRequest.getPorts().iterator())
.jobStatusObserver(setup.getStatus())
.requestSubject(setup.getExecuteStageRequest().getRequestSubject())
.workerInfo(workerInfo)
.hasJobMaster(executionRequest.getHasJobMaster())
.jobId(executionRequest.getJobId());
if (executionRequest.getStage() == 0) {
rwBuilder = rwBuilder.stage(new JobMasterStageConfig("jobmasterconfig"));
} else {
rwBuilder = rwBuilder.stage((StageConfig) setup.getMantisJob()
.getStages().get(executionRequest.getStage() - 1));
}
final RunningWorker rw = rwBuilder.build();
if (rw.getStageNum() == rw.getTotalStagesNet()) {
// set up subscription state handler only for sink (last) stage
setupSubscriptionStateHandler(setup.getExecuteStageRequest().getRequest());
}
logger.info("Running worker info: " + rw);
rw.signalStartedInitiated();
try {
logger.info(">>>>>>>>>>>>>>>>Calling lifecycle.startup()");
Lifecycle lifecycle = rw.getJob().getLifecycle();
lifecycle.startup();
ServiceLocator serviceLocator = lifecycle.getServiceLocator();
if (lookupSpectatorRegistry) {
try {
final Registry spectatorRegistry = serviceLocator.service(Registry.class);
SpectatorRegistryFactory.setRegistry(spectatorRegistry);
} catch (Throwable t) {
logger.error("failed to init spectator registry using service locator, falling back to {}",
SpectatorRegistryFactory.getRegistry().getClass().getCanonicalName());
}
}
// Ensure netty clients' listeners are set. This is redundant to the settings in TaskExecutor to ensure
// the integration at runtime level.
MantisNettyEventsListenerFactory mantisNettyEventsListenerFactory = new MantisNettyEventsListenerFactory();
RxNetty.useMetricListenersFactory(mantisNettyEventsListenerFactory);
SseWorkerConnection.useMetricListenersFactory(mantisNettyEventsListenerFactory);
// create job context
Parameters parameters = ParameterUtils
.createContextParameters(rw.getJob().getParameterDefinitions(),
setup.getParameters());
final Context context = generateContext(parameters, serviceLocator, workerInfo, MetricsRegistry.getInstance(),
() -> {
rw.signalCompleted();
// wait for completion signal to go to the master and us getting killed. Upon timeout, exit.
try {Thread.sleep(60000);} catch (InterruptedException ie) {
logger.warn("Unexpected exception sleeping: " + ie.getMessage());
}
System.exit(0);
}, createWorkerMapObservable(selfSchedulingInfo, executionRequest.getJobName(), executionRequest.getJobId(), executionRequest.getDurationType()),
classLoader
);
//context.setPrevStageCompletedObservable(createPrevStageCompletedObservable(selfSchedulingInfo, rw.getJobId(), rw.getStageNum()));
rw.setContext(context);
// setup heartbeats
heartbeatRef.set(new Heartbeat(rw.getJobId(),
rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), config.getTaskExecutorHostName()));
Closeable heartbeatCloseable = startSendingHeartbeats(rw.getJobStatus(),
executionRequest.getHeartbeatIntervalSecs());
closeables.add(heartbeatCloseable);
// execute stage
if (rw.getStageNum() == 0) {
logger.info("JobId: " + rw.getJobId() + ", executing Job Master");
final AutoScaleMetricsConfig autoScaleMetricsConfig = new AutoScaleMetricsConfig();
// Temporary workaround to enable auto-scaling by custom metric in Job Master. This will be revisited to get the entire autoscaling config
// for a job as a System parameter in the JobMaster
final String autoScaleMetricString = (String) parameters.get(JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM, "");
if (!Strings.isNullOrEmpty(autoScaleMetricString)) {
final List<String> tokens = Splitter.on("::").omitEmptyStrings().trimResults().splitToList(autoScaleMetricString);
if (tokens.size() == 3) {
final String metricGroup = tokens.get(0);
final String metricName = tokens.get(1);
final String algo = tokens.get(2);
try {
final AutoScaleMetricsConfig.AggregationAlgo aggregationAlgo = AutoScaleMetricsConfig.AggregationAlgo.valueOf(algo);
logger.info("registered UserDefined auto scale metric {}:{} algo {}", metricGroup, metricName, aggregationAlgo);
autoScaleMetricsConfig.addUserDefinedMetric(metricGroup, metricName, aggregationAlgo);
} catch (IllegalArgumentException e) {
final String errorMsg = String.format("ERROR: Invalid algorithm value %s for param %s (algo should be one of %s)",
autoScaleMetricsConfig, JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM,
Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(a -> a.name()).collect(Collectors.toList()));
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
}
} else {
final String errorMsg = String.format("ERROR: Invalid value %s for param %s", autoScaleMetricString, JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM);
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
}
} else {
logger.info("param {} is null or empty", JOB_MASTER_AUTOSCALE_METRIC_SYSTEM_PARAM);
}
JobAutoscalerManager jobAutoscalerManager = getJobAutoscalerManagerInstance(serviceLocator);
// switch to v2 scaler control only when parameter is set to true for now
final Boolean useV2ScalerService = (Boolean) parameters.get(JOB_AUTOSCALE_V2_ENABLED_PARAM, true);
if (useV2ScalerService) {
logger.info("[V2 AUTO-SCALER ENABLED] Using V2 JobAutoScalerService: JobMasterServiceV2");
// Build the JobScalerContext
JobScalerContext jobScalerContext = JobScalerContext.builder()
.jobId(rw.getJobId())
.schedInfo(rw.getSchedulingInfo())
.workerMetricsClient(workerMetricsClient)
.autoScaleMetricsConfig(autoScaleMetricsConfig)
.masterClientApi(mantisMasterApi)
.context(rw.getContext())
.observableOnCompleteCallback(rw.getOnCompleteCallback())
.observableOnErrorCallback(rw.getOnErrorCallback())
.observableOnTerminateCallback(rw.getOnTerminateCallback())
.jobAutoscalerManager(jobAutoscalerManager)
.build();
logger.info("Creating JobMasterServiceV2 using ComponentClassLoader");
final String jmLoaderConfigString = (String) parameters.get(JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM, "");
JobMasterComponentLoader jobMasterComponentLoader = JobMasterComponentLoader.fromAkkaRpc(jmLoaderConfigString);
Service jobMasterServiceV2 = jobMasterComponentLoader.createAndStartJobMasterServiceV2(jobScalerContext);
logger.info("Created JobMasterServiceV2 using ComponentClassLoader");
closeables.add(jobMasterServiceV2::shutdown);
} else {
logger.info("Using V1 JobMasterService");
JobMasterService jobMasterService = new JobMasterService(rw.getJobId(), rw.getSchedulingInfo(),
workerMetricsClient, autoScaleMetricsConfig, mantisMasterApi, rw.getContext(), rw.getOnCompleteCallback(), rw.getOnErrorCallback(), rw.getOnTerminateCallback(), jobAutoscalerManager);
jobMasterService.start();
closeables.add(jobMasterService::shutdown);
}
signalStarted(rw);
// block until worker terminates
rw.waitUntilTerminate();
} else if (rw.getStageNum() == 1 && rw.getTotalStagesNet() == 1) {
logger.info("JobId: " + rw.getJobId() + ", single stage job, executing entire job");
// single stage, execute entire job on this machine
PortSelector portSelector = new PortSelector() {
@Override
public int acquirePort() {
return rw.getPorts().next();
}
};
RxMetrics rxMetrics = new RxMetrics();
closeables.add(StageExecutors.executeSingleStageJob(rw.getJob().getSource(), rw.getStage(),
rw.getJob().getSink(), portSelector, rxMetrics, rw.getContext(),
rw.getOnTerminateCallback(), rw.getWorkerIndex(),
rw.getSourceStageTotalWorkersObservable(),
onSinkSubscribe, onSinkUnsubscribe,
rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
signalStarted(rw);
// block until worker terminates
rw.waitUntilTerminate();
} else {
logger.info("JobId: " + rw.getJobId() + ", executing a multi-stage job, stage: " + rw.getStageNum());
if (rw.getStageNum() == 1) {
// execute source stage
String remoteObservableName = rw.getJobId() + "_" + rw.getStageNum();
WorkerPublisherRemoteObservable publisher
= new WorkerPublisherRemoteObservable<>(rw.getPorts().next(),
remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1),
rw.getJobName(), getRouterFactoryInstance(serviceLocator));
closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(),
rw.getStage(), publisher, rw.getContext(), rw.getSourceStageTotalWorkersObservable()));
logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", serving remote observable for source with name: " + remoteObservableName);
RemoteRxServer server = publisher.getServer();
RxMetrics rxMetrics = server.getMetrics();
MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
signalStarted(rw);
logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", blocking until source observable completes");
server.blockUntilServerShutdown();
} else {
// execute intermediate stage or last stage plus sink
executeNonSourceStage(selfSchedulingInfo, rw, serviceLocator);
}
}
logger.info("Calling lifecycle.shutdown()");
lifecycle.shutdown();
} catch (Throwable t) {
logger.warn("Error during executing stage; shutting down.", t);
rw.signalFailed(t);
shutdownStage();
}
}