streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java [203:241]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void stop() {
    try {
      flush();
    } catch (IOException ex) {
      LOGGER.error("Error flushing", ex);
    }

    try {
      close();
    } catch (IOException ex) {
      LOGGER.error("Error closing", ex);
    }

    try {
      backgroundFlushTask.shutdown();
      // Wait a while for existing tasks to terminate
      if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
        backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
        // Wait a while for tasks to respond to being cancelled
        if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
          LOGGER.error("Stream did not terminate");
        }
      }
    } catch (InterruptedException ie) {
      // (Re-)Cancel if current thread also interrupted
      backgroundFlushTask.shutdownNow();
      // Preserve interrupt status
      Thread.currentThread().interrupt();
    }
  }

  @Override
  public void run() {
    while (true) {
      if (persistQueue.peek() != null) {
        try {
          StreamsDatum entry = persistQueue.remove();
          write(entry);
        } catch (Exception ex) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java [142:181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void stop() {

    try {
      flush();
    } catch (IOException ex) {
      LOGGER.error("Error flushing", ex);
    }
    try {
      close();
    } catch (IOException ex) {
      LOGGER.error("Error closing", ex);
    }
    try {
      backgroundFlushTask.shutdown();
      // Wait a while for existing tasks to terminate
      if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
        backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
        // Wait a while for tasks to respond to being cancelled
        if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
          LOGGER.error("Stream did not terminate");
        }
      }
    } catch (InterruptedException ie) {
      // (Re-)Cancel if current thread also interrupted
      backgroundFlushTask.shutdownNow();
      // Preserve interrupt status
      Thread.currentThread().interrupt();
    }

  }

  @Override
  public void run() {

    while (true) {
      if (persistQueue.peek() != null) {
        try {
          StreamsDatum entry = persistQueue.remove();
          write(entry);
        } catch (Exception ex) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



