in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [348:415]
private Object processItem(final Object itemRead) {
Object processedItem = null;
// if no processor defined for this chunk
if (processorProxy == null) {
return itemRead;
}
try {
// call process listeners before and after the actual process call
for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
processListenerProxy.beforeProcess(itemRead);
}
processedItem = processorProxy.processItem(itemRead);
if (processedItem == null) {
currentItemStatus.setFiltered(true);
}
for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
processListenerProxy.afterProcess(itemRead, processedItem);
}
} catch (final Exception e) {
for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
try {
processListenerProxy.onProcessError(itemRead, e);
} catch (Exception e1) {
ExceptionConfig.wrapBatchException(e1);
}
}
if(!currentChunkStatus.isRetryingAfterRollback()) {
if (retryProcessException(e, itemRead)) {
if (!retryHandler.isRollbackException(e)) {
processedItem = processItem(itemRead);
} else {
currentChunkStatus.markForRollbackWithRetry(e);
}
} else if (skipProcessException(e, itemRead)) {
currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
} else {
throw new BatchContainerRuntimeException(e);
}
} else {
if (skipProcessException(e, itemRead)) {
currentItemStatus.setSkipped(true);
stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
} else if (retryProcessException(e, itemRead)) {
if (!retryHandler.isRollbackException(e)) {
// retry without rollback
processedItem = processItem(itemRead);
} else {
// retry with rollback
currentChunkStatus.markForRollbackWithRetry(e);
}
} else {
throw new BatchContainerRuntimeException(e);
}
}
} catch (final Throwable e) {
throw new BatchContainerRuntimeException(e);
}
return processedItem;
}