in jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java [447:494]
private void checkCompletedWork() {
/**
* check the batch status of each subJob after it's done to see if we need to issue a rollback
* start rollback if any have stopped or failed
*/
boolean rollback = false;
for (final BatchWorkUnit subJob : completedWork) {
final List<StepExecution> steps = persistenceManagerService.getStepExecutionsForJobExecution(subJob.getJobExecutionImpl().getExecutionId());
if (steps.size() == 1) {
for (final Metric metric : steps.iterator().next().getMetrics()) {
stepContext.getMetric(metric.getType()).incValueBy(metric.getValue());
}
}/* else {
// TODO: possible?
}*/
final BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
if (batchStatus.equals(BatchStatus.FAILED)) {
rollback = true;
//Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
stepContext.setBatchStatus(BatchStatus.FAILED);
}
}
//If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
//NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
//We are assuming that not providing a rollback was intentional
if (rollback) {
if (this.partitionReducerProxy != null) {
try {
this.partitionReducerProxy.rollbackPartitionedStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
throw new BatchContainerRuntimeException("One or more partitions failed");
} else {
if (this.partitionReducerProxy != null) {
try {
this.partitionReducerProxy.beforePartitionedStepCompletion();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}
}