in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [538:638]
private void invokeChunk() {
try {
transactionManager.begin();
this.openReaderAndWriter();
transactionManager.commit();
} catch (final Exception e) {
rollback(e);
return;
}
try {
while (true) {
currentChunkStatus = getNextChunkStatusBasedOnPrevious();
// Sequence surrounding beginCheckpoint() updated per MR
// https://java.net/bugzilla/show_bug.cgi?id=5873
setNextChunkTransactionTimeout();
// Remember we "wrap" the built-in item-count + time-limit "algorithm"
// in a CheckpointAlgorithm for ease in keeping the sequence consistent
checkpointManager.beginCheckpoint();
transactionManager.begin();
for (ChunkListener chunkProxy : chunkListeners) {
chunkProxy.beforeChunk();
}
final List<Object> chunkToWrite = readAndProcess();
if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
rollbackAfterRetryableException();
continue;
}
// MR 1.0 Rev A clarified we'd only write a chunk with at least one item.
// See, e.g. Sec 11.6 of Spec
if (chunkToWrite.size() > 0) {
writeChunk(chunkToWrite);
}
if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
rollbackAfterRetryableException();
continue;
}
for (ChunkListener chunkProxy : chunkListeners) {
chunkProxy.afterChunk();
}
Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints();
PersistentDataWrapper userData = resolveUserData();
try {
transactionManager.commit();
storeUserData(userData);
checkpointManager.storeCheckPoints(checkpoints);
} catch (Exception e) {
// only set the Exception if we didn't blow up before anyway
if (this.stepContext.getException() != null) {
this.stepContext.setException(e);
}
if (e instanceof BatchRuntimeException) {
throw e;
}
throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e);
}
checkpointManager.endCheckpoint();
invokeCollectorIfPresent();
updateNormalMetrics(chunkToWrite.size());
// exit loop when last record is written
if (currentChunkStatus.isFinished()) {
transactionManager.begin();
if (doClose()) {
transactionManager.commit();
} else {
transactionManager.rollback();
}
break;
}
}
} catch (final Exception e) {
logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
// Only try to call onError() if we have an Exception, but not an Error.
for (ChunkListener chunkProxy : chunkListeners) {
try {
chunkProxy.onError(e);
} catch (final Exception e1) {
logger.log(Level.SEVERE, e1.getMessage(), e1);
}
}
rollback(e);
} catch (final Throwable t) {
rollback(t);
}
}