in kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/ShardingService.java [108:126]
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = TypedSPILoader.getService(JobShardingStrategy.class, jobConfig.getJobShardingStrategyType());
jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}