public void run()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java [157:223]


  public void run() {
    try {
      this.provider.prepare(this.config); //TODO allow for configuration objects
      StreamsResultSet resultSet = null;
      //Negative values mean we want to run forever
      long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
      if(this.counter == null) { //should never be null
        this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
      }
      switch(this.type) {
        case PERPETUAL: {
          provider.startStream();
          this.started.set(true);
          while(this.isRunning()) {
            try {
              long startTime = System.currentTimeMillis();
              resultSet = provider.readCurrent();
              this.counter.addTime(System.currentTimeMillis() - startTime);
              if( resultSet.size() == 0 )
                zeros++;
              else {
                zeros = 0;
              }
              flushResults(resultSet);
              // the way this works needs to change...
              if(zeros > maxZeros)
                this.keepRunning.set(false);
              if(zeros > 0)
                Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
              this.counter.incrementErrorCount();
              LOGGER.warn("Thread exception");
              this.keepRunning.set(false);
            }
          }
          Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
        }
        break;
        case READ_CURRENT:
          resultSet = this.provider.readCurrent();
          this.started.set(true);
          break;
        case READ_NEW:
          resultSet = this.provider.readNew(this.sequence);
          this.started.set(true);
          break;
        case READ_RANGE:
          resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
          this.started.set(true);
          break;
        default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
      }
      if( resultSet != null )
        flushResults(resultSet);

    } catch(Throwable e) {
      LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
    }  finally {
      Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
      LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
      this.provider.cleanUp();
      //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
      //before started would normally be set to true n the run method.
      this.started.set(true);
      this.keepRunning.set(false);
    }
  }