in odps-console-dship/src/main/java/com/aliyun/odps/ship/download/DshipDownload.java [104:160]
public void initTableDownloadWorkItems(Odps odps)
throws IOException, ODPSConsoleException, OdpsException {
PartitionHelper helper = new PartitionHelper(odps, projectName, schemaName, tableName);
if (!helper.isPartitioned()) {
if (partitonSpecLiteral != null) {
throw new OdpsException(
Constants.ERROR_INDICATOR + "can not specify partition for an unpartitioned table");
}
splitDataByThreads(new TunnelDownloadSession(tableName, null), null);
} else {
List<PartitionSpec> parSpecs = helper.inferPartitionSpecs(partitonSpecLiteral);
if (parSpecs.size() == 0) {
throw new OdpsException(Constants.ERROR_INDICATOR + "can not infer any partitions from: "
+ partitonSpecLiteral);
} else if (parSpecs.size() == 1) {
// 对于指定分区数为 1 的表,退化为下载整个表的情况,分片数量等于使用线程的数量
PartitionSpec ps = parSpecs.get(0);
splitDataByThreads(new TunnelDownloadSession(tableName, ps), ps);
} else {
// 对于指定分区数大于 2 的表,分片数量等于下载分区的数量
slices = parSpecs.size();
long sliceId = 0;
long start = 0;
for (PartitionSpec ps : parSpecs) {
if (limit != null && start == limit) {
break;
}
TunnelDownloadSession tds = new TunnelDownloadSession(tableName, ps);
SessionHistory sh = tds.getSessionHistory();
String
msg =
ps.toString() + "\tnew session: " + tds.getDownloadId() + "\ttotal lines: " + Util
.toReadableNumber(tds.getTotalLines());
System.err.println(sim.format(new Date()) + " - " + msg);
sh.log(msg);
long
step =
(limit == null) ? tds.getTotalLines() : Math.min(tds.getTotalLines(), limit - start);
String sliceFileName = filename + PartitionHelper.buildSuffix(ps);
if (StringUtils.isNotEmpty(ext)) {
sliceFileName = sliceFileName + "." + ext;
}
path = parentDir + sliceFileName;
FileDownloader sd = new FileDownloader(path, sliceId, 0L, step, tds, sh, isCsv, ps);
workItems.add(sd);
sliceId++;
start += step;
}
totalLines = start;
}
}
}