in odps-sdk-impl/odps-graph-local/src/main/java/com/aliyun/odps/graph/local/LocalGraphJobRunner.java [160:252]
private void processInput(TableInfo tableInfo) throws IOException, OdpsException {
LOG.info("Processing input: " + tableInfo);
String projName = tableInfo.getProjectName();
if (projName == null) {
projName = SessionState.get().getOdps().getDefaultProject();
}
String tblName = tableInfo.getTableName();
String[] readCols = tableInfo.getCols();
// 获取指定的MR需要处理的分区
LinkedHashMap<String, String> expectPartsHashMap = tableInfo.getPartSpec();
PartitionSpec expectParts = null;
if (expectPartsHashMap != null && expectPartsHashMap.size() > 0) {
StringBuffer sb = new StringBuffer();
for (String key : expectPartsHashMap.keySet()) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(key + "=" + expectPartsHashMap.get(key));
}
expectParts = new PartitionSpec(sb.toString());
}
// 该部分逻辑只负责下载Table Scheme和数据,后续处理和本地一样
if (!wareHouse.existsTable(projName, tblName)
|| wareHouse.getDownloadMode() == DownloadMode.ALWAYS) {
DownloadUtils
.downloadTableSchemeAndData(odps, tableInfo, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
if (!wareHouse.existsTable(projName, tblName)) {
throw new OdpsException("download table from remote host failure");
}
}
// ////从warehouse _scheme_读出的信息,相当与服务器端数据表的信息////
TableMeta whTblMeta = wareHouse.getTableMeta(projName, tblName);
Column[] whReadFields = LocalRunUtils.getInputTableFields(whTblMeta,
readCols);
List<PartitionSpec> whParts = wareHouse.getPartitions(projName, tblName);
// //////////////////////
if (whParts.size() > 0) {
// partitioned table
for (PartitionSpec partSpec : whParts) {
// 过滤非指定的分区
if (!match(expectParts, partSpec)) {
continue;
}
File whSrcDir = wareHouse.getPartitionDir(whTblMeta.getProjName(),
whTblMeta.getTableName(), partSpec);
// add input split only when src dir has data file
if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {
// 保持本地临时目录结构与warehouse中的目录结构一致
File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec));
File tempSchemeDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), null));
wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec,
readCols, tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
}
}
}
} else {
// not partitioned table
if (tableInfo.getPartSpec() != null && tableInfo.getPartSpec().size() > 0) {
throw new IOException(ExceptionCode.ODPS_0720121 + "table "
+ projName + "." + tblName + " is not partitioned table");
}
File whSrcDir = wareHouse.getTableDir(whTblMeta.getProjName(),
whTblMeta.getTableName());
if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {
// 保持本地临时目录结构与warehouse中的目录结构一致
File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
whTblMeta.getProjName(), whTblMeta.getTableName(), null));
File tempSchemeDir = tempDataDir;
wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(),null, readCols,
tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
wareHouse.getInputColumnSeperator());
for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
}
}
}
}