in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java [208:256]
public void start() {
prepare();
attachShutdownHandler();
boolean isRunning = true;
this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
this.monitor = Executors.newCachedThreadPool();
Map<String, StreamsProviderTask> provTasks = new HashMap<>();
tasks = new HashMap<>();
boolean forcedShutDown = false;
try {
if (this.useDeprecatedMonitors) {
monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
this.monitor.submit(monitorThread);
}
setupComponentTasks(tasks);
setupProviderTasks(provTasks);
LOGGER.info("Started stream with {} components", tasks.size());
while(isRunning) {
Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownCheckDelay(), TimeUnit.MILLISECONDS);
isRunning = false;
for(StreamsProviderTask task : provTasks.values()) {
isRunning = isRunning || task.isRunning();
}
for(StreamComponent task: components.values()) {
boolean tasksRunning = false;
for(StreamsTask t : task.getStreamsTasks()) {
if(t instanceof BaseStreamsTask) {
tasksRunning = tasksRunning || t.isRunning();
}
}
isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0);
}
if(isRunning) {
Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownCheckInterval(), TimeUnit.MILLISECONDS);
}
}
LOGGER.info("Components are no longer running or timed out");
} catch (Exception e){
LOGGER.warn("Runtime exception. Beginning shutdown");
forcedShutDown = true;
} finally{
LOGGER.info("Stream has completed, pausing @ {}", System.currentTimeMillis());
Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownPauseMs(), TimeUnit.MILLISECONDS);
LOGGER.info("Stream has completed, shutting down @ {}", System.currentTimeMillis());
stopInternal(forcedShutDown);
}
}