public void run()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java [107:154]


  public void run() {
    try {
      this.writer.prepare(this.streamConfig);
      if(this.counter == null) {
        this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
      }
      while(this.keepRunning.get()) {
        StreamsDatum datum = null;
        try {
          this.blocked.set(true);
          datum = this.inQueue.poll(5, TimeUnit.SECONDS);
        } catch (InterruptedException ie) {
          LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status.");
          this.keepRunning.set(false);
          if(!this.inQueue.isEmpty()) {
            LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.writer.getClass().getName());
          }
          Thread.currentThread().interrupt();
        } finally {
          this.blocked.set(false);
        }
        if(datum != null) {
          this.counter.incrementReceivedCount();
          try {
            long startTime = System.currentTimeMillis();
            this.writer.write(datum);
            this.counter.addTime(System.currentTimeMillis() - startTime);
            statusCounter.incrementStatus(DatumStatus.SUCCESS);
          } catch (Exception e) {
            LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
            this.keepRunning.set(false); // why do we shutdown on a failed write ?
            statusCounter.incrementStatus(DatumStatus.FAIL);
            DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
            this.counter.incrementErrorCount();
          }
        } else { //datums should never be null
          LOGGER.trace("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
        }
      }
      Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
    } catch(Throwable e) {
      LOGGER.error("Caught Throwable in Persist Writer {} : {}", this.writer.getClass().getSimpleName(), e);
    } finally {
      Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
      this.writer.cleanUp();
      this.isRunning.set(false);
    }
  }