in schedulerx-demo/schedulerx-example-springboot/src/main/java/com/aliyun/schedulerx/example/processor/ScanShardingTableJob.java [33:70]
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
//先分库
List<String> dbList = getDbList();
return map(dbList, "DbTask");
} else if (taskName.equals("DbTask")) {
//根据分库去分表
String dbName = (String)task;
List<String> tableList = getTableList(dbName);
return map(tableList, "TableTask");
} else if (taskName.equals("TableTask")) {
//如果一个分表也很大,再分页
String tableName = (String)task;
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
for (long i = minId; i < maxId; i+=PAGE_SIZE) {
long startId = i;
long endId = (i+PAGE_SIZE > maxId ? maxId : i+PAGE_SIZE);
tasks.add(new PageTask(tableName, startId, endId));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
String tableName = pageTask.getTableName();
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}