in jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java [144:269]
private PartitionPlan generatePartitionPlan() {
// Determine the number of partitions
PartitionPlan plan = null;
Integer previousNumPartitions = null;
final org.apache.batchee.jaxb.PartitionMapper partitionMapper = step.getPartition().getMapper();
//from persisted plan from previous run
if (stepStatus.getNumPartitions() != null) {
previousNumPartitions = stepStatus.getNumPartitions();
}
if (partitionMapper != null) { //from partition mapper
final List<Property> propertyList = partitionMapper.getProperties() == null ? null
: partitionMapper.getProperties().getPropertyList();
// Set all the contexts associated with this controller.
// Some of them may be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
final PartitionMapper partitionMapperProxy =
ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, jobExecutionImpl);
PartitionPlan mapperPlan = null;
try {
mapperPlan = partitionMapperProxy.mapPartitions();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
//Set up the new partition plan
plan = new BatchPartitionPlan();
plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
//When true is specified, the partition count from the current run
//is used and all results from past partitions are discarded.
if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null) {
plan.setPartitions(mapperPlan.getPartitions());
} else {
plan.setPartitions(previousNumPartitions);
}
if (mapperPlan.getThreads() == 0) {
plan.setThreads(plan.getPartitions());
} else {
plan.setThreads(mapperPlan.getThreads());
}
plan.setPartitionProperties(mapperPlan.getPartitionProperties());
} else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
final String partitionsAttr = step.getPartition().getPlan().getPartitions();
String threadsAttr;
int numPartitions = Integer.MIN_VALUE;
int numThreads;
Properties[] partitionProps = null;
if (partitionsAttr != null) {
try {
numPartitions = Integer.parseInt(partitionsAttr);
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
+ ", with instances=" + partitionsAttr, e);
}
partitionProps = new Properties[numPartitions];
if (numPartitions < 1) {
throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
+ ", with instances=" + partitionsAttr);
}
}
threadsAttr = step.getPartition().getPlan().getThreads();
if (threadsAttr != null) {
try {
numThreads = Integer.parseInt(threadsAttr);
if (numThreads == 0) {
numThreads = numPartitions;
}
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
+ ", with threads=" + threadsAttr, e);
}
if (numThreads < 0) {
throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
+ ", with threads=" + threadsAttr);
}
} else { //default to number of partitions if threads isn't set
numThreads = numPartitions;
}
if (step.getPartition().getPlan().getProperties() != null) {
List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
for (JSLProperties props : jslProperties) {
int targetPartition = Integer.parseInt(props.getPartition());
try {
partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
} catch (ArrayIndexOutOfBoundsException e) {
throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
+ jslProperties.size()
+ " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
}
}
}
plan = new BatchPartitionPlan();
plan.setPartitions(numPartitions);
plan.setThreads(numThreads);
plan.setPartitionProperties(partitionProps);
plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
}
// Set the other instance variables for convenience.
this.partitions = plan.getPartitions();
this.threads = plan.getThreads();
this.partitionProperties = plan.getPartitionProperties();
return plan;
}