private void invokeChunk()

in jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java [538:638]


    private void invokeChunk() {

        try {
            transactionManager.begin();
            this.openReaderAndWriter();
            transactionManager.commit();
        } catch (final Exception e) {
            rollback(e);
            return;
        }
        try {
            while (true) {

                currentChunkStatus = getNextChunkStatusBasedOnPrevious();

                // Sequence surrounding beginCheckpoint() updated per MR
                // https://java.net/bugzilla/show_bug.cgi?id=5873
                setNextChunkTransactionTimeout();

                // Remember we "wrap" the built-in item-count + time-limit "algorithm"
                // in a CheckpointAlgorithm for ease in keeping the sequence consistent
                checkpointManager.beginCheckpoint();

                transactionManager.begin();

                for (ChunkListener chunkProxy : chunkListeners) {
                    chunkProxy.beforeChunk();
                }

                final List<Object> chunkToWrite = readAndProcess();

                if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
                    rollbackAfterRetryableException();
                    continue;
                }

                // MR 1.0 Rev A clarified we'd only write a chunk with at least one item.
                // See, e.g. Sec 11.6 of Spec
                if (chunkToWrite.size() > 0) {
                    writeChunk(chunkToWrite);
                }

                if (currentChunkStatus.wasMarkedForRollbackWithRetry()) {
                    rollbackAfterRetryableException();

                    continue;
                }

                for (ChunkListener chunkProxy : chunkListeners) {
                    chunkProxy.afterChunk();
                }

                Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints();
                PersistentDataWrapper userData = resolveUserData();
                try {
                    transactionManager.commit();
                    storeUserData(userData);
                    checkpointManager.storeCheckPoints(checkpoints);
                } catch (Exception e) {
                    // only set the Exception if we didn't blow up before anyway
                    if (this.stepContext.getException() != null) {
                        this.stepContext.setException(e);
                    }
                    if (e instanceof BatchRuntimeException) {
                        throw e;
                    }
                    throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e);
                }

                checkpointManager.endCheckpoint();

                invokeCollectorIfPresent();

                updateNormalMetrics(chunkToWrite.size());

                // exit loop when last record is written
                if (currentChunkStatus.isFinished()) {
                    transactionManager.begin();
                    if (doClose()) {
                        transactionManager.commit();
                    } else {
                        transactionManager.rollback();
                    }
                    break;
                }
            }
        } catch (final Exception e) {
            logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
            // Only try to call onError() if we have an Exception, but not an Error.
            for (ChunkListener chunkProxy : chunkListeners) {
                try {
                    chunkProxy.onError(e);
                } catch (final Exception e1) {
                    logger.log(Level.SEVERE, e1.getMessage(), e1);
                }
            }
            rollback(e);
        } catch (final Throwable t) {
            rollback(t);
        }
    }