public void start()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java [208:256]


  public void start() {
    prepare();
    attachShutdownHandler();
    boolean isRunning = true;
    this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
    this.monitor = Executors.newCachedThreadPool();
    Map<String, StreamsProviderTask> provTasks = new HashMap<>();
    tasks = new HashMap<>();
    boolean forcedShutDown = false;

    try {
      if (this.useDeprecatedMonitors) {
        monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
        this.monitor.submit(monitorThread);
      }
      setupComponentTasks(tasks);
      setupProviderTasks(provTasks);
      LOGGER.info("Started stream with {} components", tasks.size());
      while(isRunning) {
        Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownCheckDelay(), TimeUnit.MILLISECONDS);
        isRunning = false;
        for(StreamsProviderTask task : provTasks.values()) {
          isRunning = isRunning || task.isRunning();
        }
        for(StreamComponent task: components.values()) {
          boolean tasksRunning = false;
          for(StreamsTask t : task.getStreamsTasks()) {
            if(t instanceof BaseStreamsTask) {
              tasksRunning = tasksRunning || t.isRunning();
            }
          }
          isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0);
        }
        if(isRunning) {
          Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownCheckInterval(), TimeUnit.MILLISECONDS);
        }
      }
      LOGGER.info("Components are no longer running or timed out");
    } catch (Exception e){
      LOGGER.warn("Runtime exception.  Beginning shutdown");
      forcedShutDown = true;
    } finally{
      LOGGER.info("Stream has completed, pausing @ {}", System.currentTimeMillis());
      Uninterruptibles.sleepUninterruptibly(localRuntimeConfiguration.getShutdownPauseMs(), TimeUnit.MILLISECONDS);
      LOGGER.info("Stream has completed, shutting down @ {}", System.currentTimeMillis());
      stopInternal(forcedShutDown);
    }

  }