in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/CrossPlatformExecutor.java [1076:1172]
public void run() {
StageActivator stageActivator;
this.crossPlatformExecutor.logger.info("Thread " + String.valueOf(this.threadId) + " started");
// Loop until there is no activated stage or only one thread running
do {
// Get the stageActivator for the stage to execute
synchronized (this.crossPlatformExecutor) {
stageActivator = this.crossPlatformExecutor.activatedStageActivators.poll();
if (stageActivator == null)
break;
}
this.crossPlatformExecutor.logger.info(this.threadId + " started executing Stage: {}:", stageActivator.getStage());
// Check if #breakpoint permits the execution.
if (!this.thread_isBreakpointDisabled && this.crossPlatformExecutor.suspendIfBreakpointRequest(stageActivator)) {
return;
}
// Otherwise, execute the stage.
final ExecutionStage stage = stageActivator.getStage();
final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();
// Find parts of the stage to instrument.
this.crossPlatformExecutor.instrumentationStrategy.applyTo(stage);
// Obtain an Executor for the stage.
final Executor executor = this.crossPlatformExecutor.getOrCreateExecutorFor(stage);
// Have the execution done.
CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
long startTime = System.currentTimeMillis();
// synchronize(this.crossplateform) can be used here to avoid error when we have two stages running same operators even on the same platform but still with different executors
synchronized (executor) {
executor.execute(stage, optimizationContext, this.crossPlatformExecutor);
long finishTime = System.currentTimeMillis();
CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));
// Remember that we have executed the stage.
this.crossPlatformExecutor.completedStages.add(stage);
if (stage.isLoopHead()) {
this.crossPlatformExecutor.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
}
// Try to activate the successor stages.
this.crossPlatformExecutor.tryToActivateSuccessors(stageActivator);
}
// We can now dispose the stageActivator that collected the input ChannelInstances.
stageActivator.dispose();
// Dispose obsolete ChannelInstances.
final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.crossPlatformExecutor.channelInstances.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
final ChannelInstance channelInstance = channelInstanceEntry.getValue();
// If this is instance is the only one to still use this ChannelInstance, discard it.
if (channelInstance.getNumReferences() == 1) {
channelInstance.noteDiscardedReference(true);
iterator.remove();
}
}
this.crossPlatformExecutor.logger.info(this.threadId + " completed executing Stage : {}:", stageActivator.getStage());
// Create new threads for more than one activated stages recursively
if (CrossPlatformExecutor.this.activatedStageActivators.size() > 1) {
// Create new threads other than the existing thread
for (int i = 1; i <= CrossPlatformExecutor.this.activatedStageActivators.size() - 1; i++) {
// TODO: Better use Java's ForkJoinPool to reduce thread creation overhead and control concurrency.
// Create parallel stage execution thread
Thread thread = new Thread(new ParallelExecutionThread(this.thread_isBreakpointDisabled, "T" + String.valueOf(i) + "@" + this.threadId, this.crossPlatformExecutor));
thread.start();
synchronized (this.crossPlatformExecutor) {
//Add the created thread to {@link #parallelExecutionThreads}
CrossPlatformExecutor.this.parallelExecutionThreads.add(thread);
}
}
}
}
// TODO: Do not busy-wait (could be solved with the ForkJoinPool as well).
while (CrossPlatformExecutor.this.activatedStageActivators.size() >= 1 &&
CrossPlatformExecutor.this.parallelExecutionThreads.size() - CrossPlatformExecutor.this.completedThreads > 1);
// Increment a global variable of completed threads
// As long as the variable is volatile so there is no concern of race condition
CrossPlatformExecutor.this.completedThreads++;
// Notify thread ended
CrossPlatformExecutor.this.logger.info(this.threadId + " ended");
}