in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [422:480]
private void writeChunk(List<Object> theChunk) {
if (!theChunk.isEmpty()) {
try {
// call read listeners before and after the actual read
for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
writeListenerProxy.beforeWrite(theChunk);
}
writerProxy.writeItems(theChunk);
for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
writeListenerProxy.afterWrite(theChunk);
}
} catch (Exception e) {
this.stepContext.setException(e);
for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
try {
writeListenerProxy.onWriteError(theChunk, e);
} catch (Exception e1) {
ExceptionConfig.wrapBatchException(e1);
}
}
if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryWriteException(e, theChunk)) {
if (!retryHandler.isRollbackException(e)) {
// retry without rollback
writeChunk(theChunk);
} else {
// retry with rollback
currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipWriteException(e, theChunk)) {
stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
} else {
throw new BatchContainerRuntimeException(e);
}
} else {
if (skipWriteException(e, theChunk)) {
stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
} else if (retryWriteException(e, theChunk)) {
if (!retryHandler.isRollbackException(e)) {
// retry without rollback
writeChunk(theChunk);
} else {
// retry with rollback
currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
}
}
} catch (Throwable e) {
throw new BatchContainerRuntimeException(e);
}
}
}