in odps-sdk-impl/odps-mapred-local/src/main/java/com/aliyun/odps/mapred/LocalJobRunner.java [404:496]
private void processInput(TableInfo tableInfo) throws IOException, OdpsException {
if (tableInfo == null || StringUtils.isBlank(tableInfo.getTableName())) {
throw new RuntimeException("Invalid TableInfo: " + tableInfo);
}
if (StringUtils.isEmpty(tableInfo.getProjectName())) {
tableInfo.setProjectName(wareHouse.getOdps().getDefaultProject());
}
String[] readCols = tableInfo.getCols();
// 获取指定的MR需要处理的分区
PartitionSpec expectParts = tableInfo.getPartitionSpec();
// 该部分逻辑只负责下载Table Scheme和数据,后续处理和本地一样
if (!wareHouse
.existsPartition(tableInfo.getProjectName(), tableInfo.getTableName(), expectParts)
||wareHouse.getDownloadMode()==DownloadMode.ALWAYS) {
DownloadUtils
.downloadTableSchemeAndData(odps, tableInfo, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
if (!wareHouse
.existsPartition(tableInfo.getProjectName(), tableInfo.getTableName(), expectParts)) {
throw new RuntimeException(LocalRunUtils.getDownloadErrorMsg(tableInfo.toString()));
}
}
// ////从warehouse _scheme_读出的信息,相当于服务器端数据表的信息////
TableMeta
whTblMeta =
wareHouse.getTableMeta(tableInfo.getProjectName(), tableInfo.getTableName());
Column[] whReadFields = LocalRunUtils.getInputTableFields(whTblMeta, readCols);
List<PartitionSpec>
whParts =
wareHouse.getPartitions(tableInfo.getProjectName(), tableInfo.getTableName());
if (whParts.size() > 0) {
// partitioned table
for (PartitionSpec partSpec : whParts) {
// 过滤非指定的分区
if (!PartitionUtils.match(expectParts, partSpec)) {
continue;
}
File whSrcDir = wareHouse.getPartitionDir(whTblMeta.getProjName(),
whTblMeta.getTableName(), partSpec);
// add input split only when src dir has data file
if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {
// 保持本地临时目录结构与warehouse中的目录结构一致
File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec));
File tempSchemeDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), null));
wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec,
readCols, tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
FileSplit split = new FileSplit(file, whReadFields, 0L, file.length());
splitToTableInfo.put(split, tableInfo);
inputs.add(split);
}
}
}
} else {
// not partitioned table
if (tableInfo.getPartSpec() != null && tableInfo.getPartSpec().size() > 0) {
throw new IOException(
MRExceptionCode.ODPS_0720121 + "table " + tableInfo.getProjectName() + "." + tableInfo
.getTableName()
+ " is not partitioned table");
}
File whSrcDir = wareHouse.getTableDir(whTblMeta.getProjName(), whTblMeta.getTableName());
if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {
// 保持本地临时目录结构与warehouse中的目录结构一致
File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), null));
File tempSchemeDir = tempDataDir;
wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), null, readCols,
tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
FileSplit split = new FileSplit(file, whReadFields, 0L, file.length());
splitToTableInfo.put(split, tableInfo);
inputs.add(split);
}
}
}
}