in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java [157:223]
public void run() {
try {
this.provider.prepare(this.config); //TODO allow for configuration objects
StreamsResultSet resultSet = null;
//Negative values mean we want to run forever
long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
if(this.counter == null) { //should never be null
this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
switch(this.type) {
case PERPETUAL: {
provider.startStream();
this.started.set(true);
while(this.isRunning()) {
try {
long startTime = System.currentTimeMillis();
resultSet = provider.readCurrent();
this.counter.addTime(System.currentTimeMillis() - startTime);
if( resultSet.size() == 0 )
zeros++;
else {
zeros = 0;
}
flushResults(resultSet);
// the way this works needs to change...
if(zeros > maxZeros)
this.keepRunning.set(false);
if(zeros > 0)
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
} catch (Exception e) {
this.counter.incrementErrorCount();
LOGGER.warn("Thread exception");
this.keepRunning.set(false);
}
}
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
}
break;
case READ_CURRENT:
resultSet = this.provider.readCurrent();
this.started.set(true);
break;
case READ_NEW:
resultSet = this.provider.readNew(this.sequence);
this.started.set(true);
break;
case READ_RANGE:
resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
this.started.set(true);
break;
default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
}
if( resultSet != null )
flushResults(resultSet);
} catch(Throwable e) {
LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
} finally {
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
this.provider.cleanUp();
//Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
//before started would normally be set to true n the run method.
this.started.set(true);
this.keepRunning.set(false);
}
}