in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [273:339]
private Object readItem() {
Object itemRead = null;
try {
// call read listeners before and after the actual read
for (ItemReadListener readListenerProxy : itemReadListeners) {
readListenerProxy.beforeRead();
}
itemRead = readerProxy.readItem();
for (ItemReadListener readListenerProxy : itemReadListeners) {
readListenerProxy.afterRead(itemRead);
}
// itemRead == null means we reached the end of
// the readerProxy "resultset"
currentChunkStatus.setFinished(itemRead == null);
} catch (Exception e) {
stepContext.setException(e);
for (ItemReadListener readListenerProxy : itemReadListeners) {
try {
readListenerProxy.onReadError(e);
} catch (Exception e1) {
ExceptionConfig.wrapBatchException(e1);
}
}
if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryReadException(e)) {
if (!retryHandler.isRollbackException(e)) {
itemRead = readItem();
} else {
// retry with rollback
currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipReadException(e)) {
currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
} else {
throw new BatchContainerRuntimeException(e);
}
} else {
// coming from a rollback retry
if (skipReadException(e)) {
currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
} else if (retryReadException(e)) {
if (!retryHandler.isRollbackException(e)) {
// retry without rollback
itemRead = readItem();
} else {
// retry with rollback
currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
}
}
} catch (final Throwable e) {
throw new BatchContainerRuntimeException(e);
}
return itemRead;
}