in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java [107:154]
public void run() {
try {
this.writer.prepare(this.streamConfig);
if(this.counter == null) {
this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
this.blocked.set(true);
datum = this.inQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status.");
this.keepRunning.set(false);
if(!this.inQueue.isEmpty()) {
LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}",this.inQueue.size(), this.writer.getClass().getName());
}
Thread.currentThread().interrupt();
} finally {
this.blocked.set(false);
}
if(datum != null) {
this.counter.incrementReceivedCount();
try {
long startTime = System.currentTimeMillis();
this.writer.write(datum);
this.counter.addTime(System.currentTimeMillis() - startTime);
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch (Exception e) {
LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
this.keepRunning.set(false); // why do we shutdown on a failed write ?
statusCounter.incrementStatus(DatumStatus.FAIL);
DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
this.counter.incrementErrorCount();
}
} else { //datums should never be null
LOGGER.trace("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
}
}
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
} catch(Throwable e) {
LOGGER.error("Caught Throwable in Persist Writer {} : {}", this.writer.getClass().getSimpleName(), e);
} finally {
Uninterruptibles.sleepUninterruptibly(streamConfig.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
this.writer.cleanUp();
this.isRunning.set(false);
}
}