public void run()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java [111:166]


  public void run() {
    try {
      this.processor.prepare(this.streamConfig);
      if(this.counter == null) {
        this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
      }
      while(this.keepRunning.get()) {
        StreamsDatum datum = null;
        try {
          this.blocked.set(true);
          datum = this.inQueue.poll(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException ie) {
          LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status.");
          this.keepRunning.set(false);
          if(!this.inQueue.isEmpty()) {
            LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.processor.getClass().getName());
          }
          Thread.currentThread().interrupt();
        } finally {
          this.blocked.set(false);
        }
        if(datum != null) {
          this.counter.incrementReceivedCount();
          try {
            long startTime = System.currentTimeMillis();
            List<StreamsDatum> output = this.processor.process(datum);
            this.counter.addTime(System.currentTimeMillis() - startTime);
            if(output != null) {
              for(StreamsDatum outDatum : output) {
                super.addToOutgoingQueue(outDatum);
                this.counter.incrementEmittedCount();
                statusCounter.incrementStatus(DatumStatus.SUCCESS);
              }
            }
          } catch (InterruptedException ie) {
            LOGGER.warn("Received InterruptedException, shutting down and re-applying interrupt status.");
            this.keepRunning.set(false);
            Thread.currentThread().interrupt();
          } catch (Throwable t) {
            this.counter.incrementErrorCount();
            LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t);
            statusCounter.incrementStatus(DatumStatus.FAIL);
            //Add the error to the metadata, but keep processing
            DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass());
          }
        } else {
          LOGGER.trace("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName());
        }
      }
    } catch(Throwable e) {
      LOGGER.error("Caught Throwable in Processor {}", this.processor.getClass().getSimpleName(), e);
    } finally {
      this.isRunning.set(false);
      this.processor.cleanUp();
    }
  }