in jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java [376:445]
private void executeAndWaitForCompletion() throws JobRestartException {
if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
return;
}
int numTotalForThisExecution = parallelBatchWorkUnits.size();
this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
int numCurrentCompleted = 0;
int numCurrentSubmitted = 0;
// All partitions have already completed on a previous execution.
if (numTotalForThisExecution == 0) {
return;
}
//Start up to to the max num we are allowed from the num threads attribute
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
kernelService.restartGeneratedJob(workUnit);
} else {
kernelService.startGeneratedJob(workUnit);
}
}
while (true) {
try {
if (analyzerProxy != null) {
PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
try {
analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
continue; // without being ready to submit another
} else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
try {
analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
completedWork.add(completedWorkQueue.take()); // Shouldn't be a a long wait.
} else {
throw new IllegalStateException("Invalid partition state");
}
} else {
// block until at least one thread has finished to
// submit more batch work. hold on to the finished work to look at later
completedWork.add(completedWorkQueue.take());
}
} catch (final InterruptedException e) {
throw new BatchContainerRuntimeException(e);
}
numCurrentCompleted++;
if (numCurrentCompleted < numTotalForThisExecution) {
if (numCurrentSubmitted < numTotalForThisExecution) {
if (stepStatus.getStartCount() > 1) {
kernelService.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
} else {
kernelService.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
}
}
} else {
break;
}
}
}