in mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java [558:634]
private void executeNonSourceStage(Observable<JobSchedulingInfo> selfSchedulingInfo, final RunningWorker rw, ServiceLocator serviceLocator) {
{
// execute either intermediate (middle) stage or last+sink
StageConfig previousStageExecuting = (StageConfig) rw.getJob().getStages()
.get(rw.getStageNum() - 2); // note, stages are zero indexed
StageSchedulingInfo previousSchedulingInfo = rw.getSchedulingInfo().forStage(rw.getStageNum() - 1);
int numInstanceAtPreviousStage = previousSchedulingInfo.getNumberOfInstances();
AtomicBoolean acceptSchedulingChanges = new AtomicBoolean(true);
WorkerConsumer consumer = connectToObservableAtPreviousStages(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() - 1,
numInstanceAtPreviousStage, previousStageExecuting, acceptSchedulingChanges,
rw.getJobStatus(), rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), serviceLocator);
final int workerPort = rw.getPorts().next();
if (rw.getStageNum() == rw.getTotalStagesNet()) {
// last+sink stage
logger.info(
"JobId: {}, executing sink stage: {}, signaling started", rw.getJobId(), rw.getStageNum());
rw.getJobStatus().onNext(new Status(rw.getJobId(), rw.getStageNum(), rw.getWorkerIndex(),
rw.getWorkerNum(),
TYPE.INFO, String.format(STATUS_MESSAGE_FORMAT, rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), "running"),
MantisJobState.Started));
PortSelector portSelector = new PortSelector() {
@Override
public int acquirePort() {
return workerPort;
}
};
RxMetrics rxMetrics = new RxMetrics();
MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
final CountDownLatch blockUntilComplete = new CountDownLatch(1);
Action0 countDownLatch = new Action0() {
@Override
public void call() {
blockUntilComplete.countDown();
}
};
closeables.add(StageExecutors.executeSink(consumer, rw.getStage(),
rw.getJob().getSink(), portSelector, rxMetrics,
rw.getContext(), countDownLatch, onSinkSubscribe, onSinkUnsubscribe,
rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
// block until completes
try {
blockUntilComplete.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
acceptSchedulingChanges.set(false);
} else {
// intermediate stage
logger.info("JobId: " + rw.getJobId() + ", executing intermediate stage: " + rw.getStageNum());
int stageNumToExecute = rw.getStageNum();
String jobId = rw.getJobId();
String remoteObservableName = jobId + "_" + stageNumToExecute;
WorkerPublisherRemoteObservable publisher
= new WorkerPublisherRemoteObservable<>(workerPort, remoteObservableName,
numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName(),
getRouterFactoryInstance(serviceLocator)
);
closeables.add(StageExecutors.executeIntermediate(consumer, rw.getStage(), publisher,
rw.getContext()));
RemoteRxServer server = publisher.getServer();
logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", serving intermediate remote observable with name: " + remoteObservableName);
RxMetrics rxMetrics = server.getMetrics();
MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
// send running signal only after server is started
signalStarted(rw);
logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", blocking until intermediate observable completes");
server.blockUntilServerShutdown();
acceptSchedulingChanges.set(false);
}
}
}