in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [213:266]
private List<Object> readAndProcess() {
List<Object> chunkToWrite = new ArrayList<Object>();
Object itemRead;
Object itemProcessed;
while (true) {
currentItemStatus = new SingleItemStatus();
currentChunkStatus.incrementItemsTouchedInCurrentChunk();
itemRead = readItem();
if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
break;
}
if (!currentItemStatus.isSkipped() && !currentChunkStatus.isFinished()) {
itemProcessed = processItem(itemRead);
if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
break;
}
if (!currentItemStatus.isSkipped() && !currentItemStatus.isFiltered()) {
chunkToWrite.add(itemProcessed);
}
}
// Break out of the loop to deliver one-at-a-time processing after rollback.
// No point calling isReadyToCheckpoint(), we know we're done. Let's not
// complicate the checkpoint algorithm to hold this logic, just break right here.
if (currentChunkStatus.isRetryingAfterRollback()) {
break;
}
// write buffer size reached
// This will force the current item to finish processing on a stop request
if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
currentChunkStatus.setFinished(true);
}
// The spec, in Sec. 11.10, Chunk with Custom Checkpoint Processing, clearly
// outlines that this gets called even when we've already read a null (which
// arguably is pointless). But we'll follow the spec.
if (checkpointManager.applyCheckPointPolicy()) {
break;
}
// last record in readerProxy reached
if (currentChunkStatus.isFinished()) {
break;
}
}
return chunkToWrite;
}