public void run()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/CrossPlatformExecutor.java [1076:1172]


        public void run() {

            StageActivator stageActivator;
            this.crossPlatformExecutor.logger.info("Thread " + String.valueOf(this.threadId) + " started");
            // Loop until there is no activated stage or only one thread running
            do {
                // Get the stageActivator for the stage to execute
                synchronized (this.crossPlatformExecutor) {
                    stageActivator = this.crossPlatformExecutor.activatedStageActivators.poll();
                    if (stageActivator == null)
                        break;
                }
                this.crossPlatformExecutor.logger.info(this.threadId + " started executing Stage: {}:", stageActivator.getStage());

                // Check if #breakpoint permits the execution.
                if (!this.thread_isBreakpointDisabled && this.crossPlatformExecutor.suspendIfBreakpointRequest(stageActivator)) {
                    return;
                }

                // Otherwise, execute the stage.

                final ExecutionStage stage = stageActivator.getStage();
                final OptimizationContext optimizationContext = stageActivator.getOptimizationContext();

                // Find parts of the stage to instrument.
                this.crossPlatformExecutor.instrumentationStrategy.applyTo(stage);

                // Obtain an Executor for the stage.
                final Executor executor = this.crossPlatformExecutor.getOrCreateExecutorFor(stage);

                // Have the execution done.
                CrossPlatformExecutor.this.logger.info("Having {} execute {}:\n{}", executor, stage, stage.getPlanAsString("> "));
                long startTime = System.currentTimeMillis();

                // synchronize(this.crossplateform) can be used here to avoid error when we have two stages running same operators even on the same platform but still with different executors
                synchronized (executor) {
                    executor.execute(stage, optimizationContext, this.crossPlatformExecutor);
                    long finishTime = System.currentTimeMillis();

                    CrossPlatformExecutor.this.logger.info("Executed {} in {}.", stage, Formats.formatDuration(finishTime - startTime, true));

                    // Remember that we have executed the stage.
                    this.crossPlatformExecutor.completedStages.add(stage);
                    if (stage.isLoopHead()) {
                        this.crossPlatformExecutor.getOrCreateLoopContext(stage.getLoop()).scrapPreviousTransitionContext();
                    }

                    // Try to activate the successor stages.
                    this.crossPlatformExecutor.tryToActivateSuccessors(stageActivator);
                }
                // We can now dispose the stageActivator that collected the input ChannelInstances.
                stageActivator.dispose();

                // Dispose obsolete ChannelInstances.
                final Iterator<Map.Entry<Channel, ChannelInstance>> iterator = this.crossPlatformExecutor.channelInstances.entrySet().iterator();
                while (iterator.hasNext()) {
                    final Map.Entry<Channel, ChannelInstance> channelInstanceEntry = iterator.next();
                    final ChannelInstance channelInstance = channelInstanceEntry.getValue();

                    // If this is instance is the only one to still use this ChannelInstance, discard it.
                    if (channelInstance.getNumReferences() == 1) {
                        channelInstance.noteDiscardedReference(true);
                        iterator.remove();
                    }

                }

                this.crossPlatformExecutor.logger.info(this.threadId + " completed executing Stage : {}:", stageActivator.getStage());

                // Create new threads for more than one activated stages recursively
                if (CrossPlatformExecutor.this.activatedStageActivators.size() > 1) {
                    // Create new threads other than the existing thread
                    for (int i = 1; i <= CrossPlatformExecutor.this.activatedStageActivators.size() - 1; i++) {
                        // TODO: Better use Java's ForkJoinPool to reduce thread creation overhead and control concurrency.
                        // Create parallel stage execution thread
                        Thread thread = new Thread(new ParallelExecutionThread(this.thread_isBreakpointDisabled, "T" + String.valueOf(i) + "@" + this.threadId, this.crossPlatformExecutor));
                        thread.start();
                        synchronized (this.crossPlatformExecutor) {
                            //Add the created thread to {@link #parallelExecutionThreads}
                            CrossPlatformExecutor.this.parallelExecutionThreads.add(thread);
                        }
                    }
                }

            }

            // TODO: Do not busy-wait (could be solved with the ForkJoinPool as well).
            while (CrossPlatformExecutor.this.activatedStageActivators.size() >= 1 &&
                    CrossPlatformExecutor.this.parallelExecutionThreads.size() - CrossPlatformExecutor.this.completedThreads > 1);

            // Increment a global variable of completed threads
            // As long as the variable is volatile so there is no concern of race condition
            CrossPlatformExecutor.this.completedThreads++;

            // Notify thread ended
            CrossPlatformExecutor.this.logger.info(this.threadId + " ended");
        }