private void executeAndWaitForCompletion()

in jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java [376:445]


    private void executeAndWaitForCompletion() throws JobRestartException {

        if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
            return;
        }

        int numTotalForThisExecution = parallelBatchWorkUnits.size();
        this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
        int numCurrentCompleted = 0;
        int numCurrentSubmitted = 0;

        // All partitions have already completed on a previous execution.
        if (numTotalForThisExecution == 0) {
          return;
        }

        //Start up to to the max num we are allowed from the num threads attribute
        for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
            final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
            if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
                kernelService.restartGeneratedJob(workUnit);
            } else {
                kernelService.startGeneratedJob(workUnit);
            }
        }

        while (true) {
            try {
                if (analyzerProxy != null) {
                    PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
                    if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
                        try {
                            analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
                        } catch (Exception e) {
                            ExceptionConfig.wrapBatchException(e);
                        }
                        continue; // without being ready to submit another
                    } else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
                        try {
                            analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
                        } catch (Exception e) {
                            ExceptionConfig.wrapBatchException(e);
                        }
                        completedWork.add(completedWorkQueue.take());  // Shouldn't be a a long wait.
                    } else {
                        throw new IllegalStateException("Invalid partition state");
                    }
                } else {
                    // block until at least one thread has finished to
                    // submit more batch work. hold on to the finished work to look at later
                    completedWork.add(completedWorkQueue.take());
                }
            } catch (final InterruptedException e) {
                throw new BatchContainerRuntimeException(e);
            }

            numCurrentCompleted++;
            if (numCurrentCompleted < numTotalForThisExecution) {
                if (numCurrentSubmitted < numTotalForThisExecution) {
                    if (stepStatus.getStartCount() > 1) {
                        kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                    } else {
                        kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
                    }
                }
            } else {
                break;
            }
        }
    }