in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java [111:166]
public void run() {
try {
this.processor.prepare(this.streamConfig);
if(this.counter == null) {
this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
this.blocked.set(true);
datum = this.inQueue.poll(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status.");
this.keepRunning.set(false);
if(!this.inQueue.isEmpty()) {
LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.processor.getClass().getName());
}
Thread.currentThread().interrupt();
} finally {
this.blocked.set(false);
}
if(datum != null) {
this.counter.incrementReceivedCount();
try {
long startTime = System.currentTimeMillis();
List<StreamsDatum> output = this.processor.process(datum);
this.counter.addTime(System.currentTimeMillis() - startTime);
if(output != null) {
for(StreamsDatum outDatum : output) {
super.addToOutgoingQueue(outDatum);
this.counter.incrementEmittedCount();
statusCounter.incrementStatus(DatumStatus.SUCCESS);
}
}
} catch (InterruptedException ie) {
LOGGER.warn("Received InterruptedException, shutting down and re-applying interrupt status.");
this.keepRunning.set(false);
Thread.currentThread().interrupt();
} catch (Throwable t) {
this.counter.incrementErrorCount();
LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t);
statusCounter.incrementStatus(DatumStatus.FAIL);
//Add the error to the metadata, but keep processing
DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass());
}
} else {
LOGGER.trace("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName());
}
}
} catch(Throwable e) {
LOGGER.error("Caught Throwable in Processor {}", this.processor.getClass().getSimpleName(), e);
} finally {
this.isRunning.set(false);
this.processor.cleanUp();
}
}