private void shutDownTask()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java [358:397]


  private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
    List<StreamsTask> tasks = streamTasks.get(comp.getId());
    if(tasks != null) { //not a StreamProvider
      boolean parentsShutDown = true;
      for(StreamComponent parent : comp.getUpStreamComponents()) {
        List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
        //if parentTask == null, its a provider and is not running anymore
        if(parentTasks != null) {
          for(StreamsTask task : parentTasks) {
            parentsShutDown = parentsShutDown && !task.isRunning();
          }
        }
      }
      if(parentsShutDown) {
        for(StreamsTask task : tasks) {
          task.stopTask();
          if(task.isWaiting()) {
            this.futures.get(task).cancel(true); // no data to process, interrupt block queue
          }
        }
        for(StreamsTask task : tasks) {
          int count = 0;
          while(count < localRuntimeConfiguration.getTaskTimeoutMs() / 1000 && task.isRunning()) {
            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
            count++;
          }

          if(task.isRunning()) {
            LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString());
          }
        }
      }
    }
    Collection<StreamComponent> children = comp.getDownStreamComponents();
    if(children != null) {
      for(StreamComponent child : comp.getDownStreamComponents()) {
        shutDownTask(child, streamTasks);
      }
    }
  }